Skip to content

Commit

Permalink
Queries Progress (netdata#16574)
Browse files Browse the repository at this point in the history
* track the progress of queries

* add query_progress in libnetdata Makefile.am

* add acl, response size and response code to the tracking

* define the required functions

* fix the last commit

* added /api/v2/progress?transaction=ID to report the progress of queries

* added function to report netdata-queries

* track hashtable additions

* when resusing a transaction, maintain the counter

* keep track of linked and indexing

* added X-Forwarded-Host and X-Forwarded-For to logs. X-Forwarded-For is also added in progress tracking

* report compact uuids to match logs; register the actual duration of the transaction

* added rowOptions to function; now web_client keeps track if it tracks progress or not

* add http request method to progress

* add tags per function; /api/vX/functions is now not protected

* compact the sanitization array

* split pluginsd_parser into multiple files

* cleanup keyword definitions

* code cleanup

* extracted rrd_collector to separate files

* added http access level to functions

* renamed access "all" to "any"

* implemented optional protection on functions

* add priority to functions, to allow the UI select the best function (lower priority) when the user has not selected a function

* added progress report from the plugins to netdata and from children to parents - untested

* added progress reporting in systemd-journal

* query timeout is now handled by evloop for external plugins

* propagate progress reports to children and plugins

* fix codeql warning

* adapt to cmake

* minor changes

* extend function timeout when progress is received; added streaming capability to propagate progress reports to parents and send progress requests to children

* revert change in dictionary.h

* add log when access level is invalid

* update access level of functions

* added logs when processing progress updates

* log when the deferred response is too big

* comment out sender progress to find the issue

* added missing newline in streaming progress reports

* propogate progress reports to functions

* fix logs
  • Loading branch information
ktsaou authored Dec 15, 2023
1 parent 0c8b46c commit da32dd8
Show file tree
Hide file tree
Showing 84 changed files with 4,645 additions and 3,307 deletions.
17 changes: 16 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ set(LIBNETDATA_FILES ${CONFIG_H}
libnetdata/popen/popen.h
libnetdata/procfile/procfile.c
libnetdata/procfile/procfile.h
libnetdata/query_progress/progress.c
libnetdata/query_progress/progress.h
libnetdata/required_dummies.h
libnetdata/socket/security.c
libnetdata/socket/security.h
Expand All @@ -411,6 +413,9 @@ set(LIBNETDATA_FILES ${CONFIG_H}
libnetdata/string/utf8.h
libnetdata/worker_utilization/worker_utilization.c
libnetdata/worker_utilization/worker_utilization.h
libnetdata/http/http_access.c
libnetdata/http/http_access.h
libnetdata/http/http_defs.c
libnetdata/http/http_defs.h
libnetdata/dyn_conf/dyn_conf.c
libnetdata/dyn_conf/dyn_conf.h)
Expand Down Expand Up @@ -644,8 +649,16 @@ endif()

set(PLUGINSD_PLUGIN_FILES collectors/plugins.d/plugins_d.c
collectors/plugins.d/plugins_d.h
collectors/plugins.d/pluginsd_dyncfg.c
collectors/plugins.d/pluginsd_dyncfg.h
collectors/plugins.d/pluginsd_functions.c
collectors/plugins.d/pluginsd_functions.h
collectors/plugins.d/pluginsd_internals.c
collectors/plugins.d/pluginsd_internals.h
collectors/plugins.d/pluginsd_parser.c
collectors/plugins.d/pluginsd_parser.h)
collectors/plugins.d/pluginsd_parser.h
collectors/plugins.d/pluginsd_replication.c
collectors/plugins.d/pluginsd_replication.h)

set(RRD_PLUGIN_FILES database/contexts/api_v1.c
database/contexts/api_v2.c
Expand All @@ -662,6 +675,8 @@ set(RRD_PLUGIN_FILES database/contexts/api_v1.c
database/rrdcalc.h
database/rrdcalctemplate.c
database/rrdcalctemplate.h
database/rrdcollector.c
database/rrdcollector.h
database/rrddim.c
database/rrddimvar.c
database/rrddimvar.h
Expand Down
4 changes: 2 additions & 2 deletions aclk/aclk_query.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
char *start, *end;

struct web_client *w = web_client_get_from_cache();
w->acl = WEB_CLIENT_ACL_ACLK;
w->mode = WEB_CLIENT_MODE_GET;
w->acl = HTTP_ACL_ACLK;
w->mode = HTTP_REQUEST_MODE_GET;
w->timings.tv_in = query->created_tv;

w->interrupt.callback = aclk_web_client_interrupt_cb;
Expand Down
10 changes: 7 additions & 3 deletions collectors/apps.plugin/apps_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
#define APPS_PLUGIN_PROCESSES_FUNCTION_DESCRIPTION "Detailed information on the currently running processes."

#define APPS_PLUGIN_FUNCTIONS() do { \
fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " \"processes\" %d \"%s\"\n", PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT, APPS_PLUGIN_PROCESSES_FUNCTION_DESCRIPTION); \
fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " \"processes\" %d \"%s\" \"top\" \"members\" %d\n", \
PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT, APPS_PLUGIN_PROCESSES_FUNCTION_DESCRIPTION, \
RRDFUNCTIONS_PRIORITY_DEFAULT / 10); \
} while(0)

#define APPS_PLUGIN_GLOBAL_FUNCTIONS() do { \
fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " GLOBAL \"processes\" %d \"%s\"\n", PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT, APPS_PLUGIN_PROCESSES_FUNCTION_DESCRIPTION); \
fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " GLOBAL \"processes\" %d \"%s\" \"top\" \"members\" %d\n", \
PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT, APPS_PLUGIN_PROCESSES_FUNCTION_DESCRIPTION, \
RRDFUNCTIONS_PRIORITY_DEFAULT / 10); \
} while(0)

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -4393,7 +4397,7 @@ static void apps_plugin_function_processes_help(const char *transaction) {
buffer_json_add_array_item_double(wb, _tmp); \
} while(0)

static void function_processes(const char *transaction, char *function __maybe_unused, int timeout __maybe_unused, bool *cancelled __maybe_unused) {
static void function_processes(const char *transaction, char *function __maybe_unused, usec_t *stop_monotonic_ut __maybe_unused, bool *cancelled __maybe_unused) {
struct pid_stat *p;

char *words[PLUGINSD_MAX_WORDS] = { NULL };
Expand Down
28 changes: 19 additions & 9 deletions collectors/cgroups.plugin/cgroup-internals.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,15 +452,25 @@ static inline char *cgroup_chart_type(char *buffer, struct cgroup *cg) {
}

#define RRDFUNCTIONS_CGTOP_HELP "View running containers"

int cgroup_function_cgroup_top(BUFFER *wb, int timeout, const char *function, void *collector_data,
rrd_function_result_callback_t result_cb, void *result_cb_data,
rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data,
rrd_function_register_canceller_cb_t register_canceller_cb, void *register_canceller_cb_data);
int cgroup_function_systemd_top(BUFFER *wb, int timeout, const char *function, void *collector_data,
rrd_function_result_callback_t result_cb, void *result_cb_data,
rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data,
rrd_function_register_canceller_cb_t register_canceller_cb, void *register_canceller_cb_data);
#define RRDFUNCTIONS_SYSTEMD_SERVICES_HELP "View systemd services"

int cgroup_function_cgroup_top(uuid_t *transaction, BUFFER *wb,
usec_t *stop_monotonic_ut, const char *function, void *collector_data,
rrd_function_result_callback_t result_cb, void *result_cb_data,
rrd_function_progress_cb_t progress_cb, void *progress_cb_data,
rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data,
rrd_function_register_canceller_cb_t register_canceller_cb, void *register_canceller_cb_data,
rrd_function_register_progresser_cb_t register_progresser_cb,
void *register_progresser_cb_data);

int cgroup_function_systemd_top(uuid_t *transaction, BUFFER *wb,
usec_t *stop_monotonic_ut, const char *function, void *collector_data,
rrd_function_result_callback_t result_cb, void *result_cb_data,
rrd_function_progress_cb_t progress_cb, void *progress_cb_data,
rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data,
rrd_function_register_canceller_cb_t register_canceller_cb, void *register_canceller_cb_data,
rrd_function_register_progresser_cb_t register_progresser_cb,
void *register_progresser_cb_data);

void cgroup_netdev_link_init(void);
const DICTIONARY_ITEM *cgroup_netdev_get(struct cgroup *cg);
Expand Down
32 changes: 20 additions & 12 deletions collectors/cgroups.plugin/cgroup-top.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,16 @@ void cgroup_netdev_get_bandwidth(struct cgroup *cg, NETDATA_DOUBLE *received, NE
*sent = t->sent[slot];
}

int cgroup_function_cgroup_top(BUFFER *wb, int timeout __maybe_unused, const char *function __maybe_unused,
void *collector_data __maybe_unused,
rrd_function_result_callback_t result_cb, void *result_cb_data,
rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data,
rrd_function_register_canceller_cb_t register_canceller_cb __maybe_unused,
void *register_canceller_cb_data __maybe_unused) {
int cgroup_function_cgroup_top(uuid_t *transaction __maybe_unused, BUFFER *wb,
usec_t *stop_monotonic_ut __maybe_unused, const char *function __maybe_unused,
void *collector_data __maybe_unused,
rrd_function_result_callback_t result_cb, void *result_cb_data,
rrd_function_progress_cb_t progress_cb, void *progress_cb_data,
rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data,
rrd_function_register_canceller_cb_t register_canceller_cb __maybe_unused,
void *register_canceller_cb_data __maybe_unused,
rrd_function_register_progresser_cb_t register_progresser_cb __maybe_unused,
void *register_progresser_cb_data __maybe_unused) {

buffer_flush(wb);
wb->content_type = CT_APPLICATION_JSON;
Expand Down Expand Up @@ -342,12 +346,16 @@ int cgroup_function_cgroup_top(BUFFER *wb, int timeout __maybe_unused, const cha
return response;
}

int cgroup_function_systemd_top(BUFFER *wb, int timeout __maybe_unused, const char *function __maybe_unused,
void *collector_data __maybe_unused,
rrd_function_result_callback_t result_cb, void *result_cb_data,
rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data,
rrd_function_register_canceller_cb_t register_canceller_cb __maybe_unused,
void *register_canceller_cb_data __maybe_unused) {
int cgroup_function_systemd_top(uuid_t *transaction __maybe_unused, BUFFER *wb,
usec_t *stop_monotonic_ut __maybe_unused, const char *function __maybe_unused,
void *collector_data __maybe_unused,
rrd_function_result_callback_t result_cb, void *result_cb_data,
rrd_function_progress_cb_t progress_cb __maybe_unused, void *progress_cb_data __maybe_unused,
rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data,
rrd_function_register_canceller_cb_t register_canceller_cb __maybe_unused,
void *register_canceller_cb_data __maybe_unused,
rrd_function_register_progresser_cb_t register_progresser_cb __maybe_unused,
void *register_progresser_cb_data __maybe_unused) {

buffer_flush(wb);
wb->content_type = CT_APPLICATION_JSON;
Expand Down
10 changes: 8 additions & 2 deletions collectors/cgroups.plugin/sys_fs_cgroup.c
Original file line number Diff line number Diff line change
Expand Up @@ -1671,8 +1671,14 @@ void *cgroups_main(void *ptr) {
// for the other nodes, the origin server should register it
rrd_collector_started(); // this creates a collector that runs for as long as netdata runs
cgroup_netdev_link_init();
rrd_function_add(localhost, NULL, "containers-vms", 10, RRDFUNCTIONS_CGTOP_HELP, true, cgroup_function_cgroup_top, NULL);
rrd_function_add(localhost, NULL, "systemd-services", 10, RRDFUNCTIONS_CGTOP_HELP, true, cgroup_function_systemd_top, NULL);

rrd_function_add(localhost, NULL, "containers-vms", 10, RRDFUNCTIONS_PRIORITY_DEFAULT / 2,
RRDFUNCTIONS_CGTOP_HELP, "top", HTTP_ACCESS_ANY,
true, cgroup_function_cgroup_top, NULL);

rrd_function_add(localhost, NULL, "systemd-services", 10, RRDFUNCTIONS_PRIORITY_DEFAULT / 3,
RRDFUNCTIONS_SYSTEMD_SERVICES_HELP, "top", HTTP_ACCESS_ANY,
true, cgroup_function_systemd_top, NULL);

heartbeat_t hb;
heartbeat_init(&hb);
Expand Down
20 changes: 13 additions & 7 deletions collectors/diskspace.plugin/plugin_diskspace.c
Original file line number Diff line number Diff line change
Expand Up @@ -636,12 +636,16 @@ static void diskspace_main_cleanup(void *ptr) {
#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 3
#endif

int diskspace_function_mount_points(BUFFER *wb, int timeout __maybe_unused, const char *function __maybe_unused,
void *collector_data __maybe_unused,
rrd_function_result_callback_t result_cb, void *result_cb_data,
rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data,
rrd_function_register_canceller_cb_t register_canceller_cb __maybe_unused,
void *register_canceller_cb_data __maybe_unused) {
int diskspace_function_mount_points(uuid_t *transaction __maybe_unused, BUFFER *wb,
usec_t *stop_monotonic_ut __maybe_unused, const char *function __maybe_unused,
void *collector_data __maybe_unused,
rrd_function_result_callback_t result_cb, void *result_cb_data,
rrd_function_progress_cb_t progress_cb __maybe_unused, void *progress_cb_data __maybe_unused,
rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data,
rrd_function_register_canceller_cb_t register_canceller_cb __maybe_unused,
void *register_canceller_cb_data __maybe_unused,
rrd_function_register_progresser_cb_t register_progresser_cb __maybe_unused,
void *register_progresser_cb_data __maybe_unused) {

buffer_flush(wb);
wb->content_type = CT_APPLICATION_JSON;
Expand Down Expand Up @@ -865,7 +869,9 @@ void *diskspace_main(void *ptr) {
worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup");

rrd_collector_started();
rrd_function_add(localhost, NULL, "mount-points", 10, RRDFUNCTIONS_DISKSPACE_HELP, true, diskspace_function_mount_points, NULL);
rrd_function_add(localhost, NULL, "mount-points", 10, RRDFUNCTIONS_PRIORITY_DEFAULT, RRDFUNCTIONS_DISKSPACE_HELP,
"top", HTTP_ACCESS_ANY,
true, diskspace_function_mount_points, NULL);

netdata_thread_cleanup_push(diskspace_main_cleanup, ptr);

Expand Down
3 changes: 1 addition & 2 deletions collectors/ebpf.plugin/ebpf_functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -639,10 +639,9 @@ void ebpf_socket_read_open_connections(BUFFER *buf, struct ebpf_module *em)
*/
static void ebpf_function_socket_manipulation(const char *transaction,
char *function __maybe_unused,
int timeout __maybe_unused,
usec_t *stop_monotonic_ut __maybe_unused,
bool *cancelled __maybe_unused)
{
UNUSED(timeout);
ebpf_module_t *em = &ebpf_modules[EBPF_MODULE_SOCKET_IDX];

char *words[PLUGINSD_MAX_WORDS] = {NULL};
Expand Down
3 changes: 2 additions & 1 deletion collectors/ebpf.plugin/ebpf_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
#ifdef NETDATA_DEV_MODE
// Common
static inline void EBPF_PLUGIN_FUNCTIONS(const char *NAME, const char *DESC) {
fprintf(stdout, "%s \"%s\" 10 \"%s\"\n", PLUGINSD_KEYWORD_FUNCTION, NAME, DESC);
fprintf(stdout, "%s \"%s\" 10 \"%s\" \"top\" \"any\" %d\n",
PLUGINSD_KEYWORD_FUNCTION, NAME, DESC, RRDFUNCTIONS_PRIORITY_DEFAULT);
}
#endif

Expand Down
5 changes: 3 additions & 2 deletions collectors/freeipmi.plugin/freeipmi_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
#include "libnetdata/required_dummies.h"

#define FREEIPMI_GLOBAL_FUNCTION_SENSORS() do { \
fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " GLOBAL \"ipmi-sensors\" %d \"%s\"\n", 5, "Displays current sensor state and readings"); \
fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " GLOBAL \"ipmi-sensors\" %d \"%s\" \"top\" \"any\" %d\n", \
5, "Displays current sensor state and readings", 100); \
} while(0)

// component names, based on our patterns
Expand Down Expand Up @@ -1470,7 +1471,7 @@ static const char *get_sensor_function_priority(struct sensor *sn) {
}
}

static void freeimi_function_sensors(const char *transaction, char *function __maybe_unused, int timeout __maybe_unused, bool *cancelled __maybe_unused) {
static void freeimi_function_sensors(const char *transaction, char *function __maybe_unused, usec_t *stop_monotonic_ut __maybe_unused, bool *cancelled __maybe_unused) {
time_t expires = now_realtime_sec() + update_every;

BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL);
Expand Down
2 changes: 1 addition & 1 deletion collectors/log2journal/log2journal.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ static inline HASHED_KEY *get_key_from_hashtable(LOG_JOB *jb, HASHED_KEY *k) {

if(!k->hashtable_ptr) {
HASHED_KEY *ht_key;
SIMPLE_HASHTABLE_SLOT_KEY *slot = simple_hashtable_get_slot_KEY(&jb->hashtable, k->hash, true);
SIMPLE_HASHTABLE_SLOT_KEY *slot = simple_hashtable_get_slot_KEY(&jb->hashtable, k->hash, NULL, true);
if((ht_key = SIMPLE_HASHTABLE_SLOT_DATA(slot))) {
if(!(ht_key->flags & HK_COLLISION_CHECKED)) {
ht_key->flags |= HK_COLLISION_CHECKED;
Expand Down
5 changes: 1 addition & 4 deletions collectors/log2journal/log2journal.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ static inline void freez(void *ptr) {
// hashtable for HASHED_KEY

// cleanup hashtable defines
#undef SIMPLE_HASHTABLE_SORT_FUNCTION
#undef SIMPLE_HASHTABLE_VALUE_TYPE
#undef SIMPLE_HASHTABLE_NAME
#undef NETDATA_SIMPLE_HASHTABLE_H
#include "../../libnetdata/simple_hashtable_undef.h"

struct hashed_key;
static inline int compare_keys(struct hashed_key *k1, struct hashed_key *k2);
Expand Down
7 changes: 6 additions & 1 deletion collectors/plugins.d/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ Netdata parses lines starting with:
- `FLUSH` - ignore the last collected values
- `DISABLE` - disable this plugin
- `FUNCTION` - define functions
- `FUNCTION_PROGRESS` - report the progress of a function execution
- `FUNCTION_RESULT_BEGIN` - to initiate the transmission of function results
- `FUNCTION_RESULT_END` - to end the transmission of function results

Expand All @@ -146,6 +147,7 @@ Netdata may send the following commands to the plugin's `stdin`:
- `FUNCTION` - to call a specific function, with all parameters inline
- `FUNCTION_PAYLOAD` - to call a specific function, with a payload of parameters
- `FUNCTION_PAYLOAD_END` - to end the payload of parameters
- `FUNCTION_CANCEL` - cancel a running function transaction

### Command line parameters

Expand Down Expand Up @@ -466,7 +468,10 @@ The `source` is an integer field that can have the following values:

The plugin can register functions to Netdata, like this:

> FUNCTION [GLOBAL] "name and parameters of the function" timeout "help string for users"
> FUNCTION [GLOBAL] "name and parameters of the function" timeout "help string for users" "tags" "access"
- Tags currently recognized are either `top` or `logs` (or both, space separated).
- Access is one of `any`, `members`, or `admins`.

A function can be used by users to ask for more information from the collector. Netdata maintains a registry of functions in 2 levels:

Expand Down
Loading

0 comments on commit da32dd8

Please sign in to comment.