From 11de4e4ab77177bc1a4f9b6358151adf525f2ca0 Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Sat, 16 Sep 2023 16:00:42 +0300 Subject: [PATCH] Functions: allow collectors to be restarted (#15983) --- database/rrd.h | 2 + database/rrdfunctions.c | 98 ++++++++++++++++++++++++----------- libnetdata/required_dummies.h | 1 + libnetdata/threads/threads.c | 2 + streaming/rrdpush.c | 6 +++ 5 files changed, 80 insertions(+), 29 deletions(-) diff --git a/database/rrd.h b/database/rrd.h index 9ff838559cde64..19af417e036172 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -931,6 +931,8 @@ typedef enum __attribute__ ((__packed__)) rrdhost_flags { RRDHOST_FLAG_METADATA_CLAIMID = (1 << 28), // metadata needs to be stored in the database RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED = (1 << 29), // set when the receiver part is disconnected + + RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED = (1 << 30), // set when the host has updated global functions } RRDHOST_FLAGS; #define rrdhost_flag_check(host, flag) (__atomic_load_n(&((host)->flags), __ATOMIC_SEQ_CST) & (flag)) diff --git a/database/rrdfunctions.c b/database/rrdfunctions.c index 7a097ab9cae3ba..81a911c4892d81 100644 --- a/database/rrdfunctions.c +++ b/database/rrdfunctions.c @@ -310,8 +310,11 @@ struct rrd_collector { static __thread struct rrd_collector *thread_rrd_collector = NULL; static void rrd_collector_free(struct rrd_collector *rdc) { + if(rdc->running) + return; + int32_t expected = 0; - if(likely(!__atomic_compare_exchange_n(&rdc->refcount, &expected, -1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))) { + if(!__atomic_compare_exchange_n(&rdc->refcount, &expected, -1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) { // the collector is still referenced by charts. // leave it hanging there, the last chart will actually free it. return; @@ -323,9 +326,9 @@ static void rrd_collector_free(struct rrd_collector *rdc) { // called once per collector void rrd_collector_started(void) { - if(likely(thread_rrd_collector)) return; + if(!thread_rrd_collector) + thread_rrd_collector = callocz(1, sizeof(struct rrd_collector)); - thread_rrd_collector = callocz(1, sizeof(struct rrd_collector)); thread_rrd_collector->tid = gettid(); thread_rrd_collector->running = true; } @@ -341,43 +344,70 @@ void rrd_collector_finished(void) { } static struct rrd_collector *rrd_collector_acquire(void) { - __atomic_add_fetch(&thread_rrd_collector->refcount, 1, __ATOMIC_SEQ_CST); + rrd_collector_started(); + + int32_t expected = __atomic_load_n(&thread_rrd_collector->refcount, __ATOMIC_RELAXED), wanted = 0; + do { + if(expected < 0 || !thread_rrd_collector->running) { + internal_fatal(true, "FUNCTIONS: Trying to acquire a collector that is exiting."); + return thread_rrd_collector; + } + + wanted = expected + 1; + + } while(!__atomic_compare_exchange_n(&thread_rrd_collector->refcount, &expected, wanted, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)); + return thread_rrd_collector; } static void rrd_collector_release(struct rrd_collector *rdc) { if(unlikely(!rdc)) return; - int32_t refcount = __atomic_sub_fetch(&rdc->refcount, 1, __ATOMIC_SEQ_CST); - if(refcount == 0 && !rdc->running) + int32_t expected = __atomic_load_n(&rdc->refcount, __ATOMIC_RELAXED), wanted = 0; + do { + if(expected < 0) { + internal_fatal(true, "FUNCTIONS: Trying to release a collector that is exiting."); + return; + } + + if(expected == 0) { + internal_fatal(true, "FUNCTIONS: Trying to release a collector that is not acquired."); + return; + } + + wanted = expected - 1; + + } while(!__atomic_compare_exchange_n(&rdc->refcount, &expected, wanted, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)); + + if(wanted == 0) rrd_collector_free(rdc); } -static void rrd_functions_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, - void *rrdhost __maybe_unused) { +static void rrd_functions_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *rrdhost) { + RRDHOST *host = rrdhost; (void)host; struct rrd_collector_function *rdcf = func; - if(!thread_rrd_collector) - fatal("RRDSET_COLLECTOR: called %s() for function '%s' without calling rrd_collector_started() first.", - __FUNCTION__, dictionary_acquired_item_name(item)); - + rrd_collector_started(); rdcf->collector = rrd_collector_acquire(); + +// internal_error(true, "FUNCTIONS: adding function '%s' on host '%s', collection tid %d, %s", +// dictionary_acquired_item_name(item), rrdhost_hostname(host), +// rdcf->collector->tid, rdcf->collector->running ? "running" : "NOT running"); } -static void rrd_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, +static void rrd_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *rrdhost __maybe_unused) { struct rrd_collector_function *rdcf = func; rrd_collector_release(rdcf->collector); } -static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, - void *new_func __maybe_unused, void *rrdhost __maybe_unused) { +static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, + void *new_func, void *rrdhost) { + RRDHOST *host = rrdhost; (void)host; struct rrd_collector_function *rdcf = func; struct rrd_collector_function *new_rdcf = new_func; - if(!thread_rrd_collector) - fatal("RRDSET_COLLECTOR: called %s() for function '%s' without calling rrd_collector_started() first.", - __FUNCTION__, dictionary_acquired_item_name(item)); + rrd_collector_started(); bool changed = false; @@ -417,6 +447,10 @@ static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_ changed = true; } +// internal_error(true, "FUNCTIONS: adding function '%s' on host '%s', collection tid %d, %s", +// dictionary_acquired_item_name(item), rrdhost_hostname(host), +// rdcf->collector->tid, rdcf->collector->running ? "running" : "NOT running"); + return changed; } @@ -460,6 +494,8 @@ void rrd_collector_add_function(RRDHOST *host, RRDSET *st, const char *name, int if(st) dictionary_view_set(st->functions_view, key, item); + else + rrdhost_flag_set(host, RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED); dictionary_acquired_item_release(host->functions, item); } @@ -481,6 +517,8 @@ void rrd_functions_expose_rrdpush(RRDSET *st, BUFFER *wb) { } void rrd_functions_expose_global_rrdpush(RRDHOST *host, BUFFER *wb) { + rrdhost_flag_clear(host, RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED); + struct rrd_collector_function *tmp; dfe_start_read(host->functions, tmp) { if(!(tmp->options & RRD_FUNCTION_GLOBAL)) @@ -565,20 +603,22 @@ static int rrd_call_function_find(RRDHOST *host, BUFFER *wb, const char *name, s char *s = NULL; *rdcf = NULL; - while(!(*rdcf) && buffer[0]) { - *rdcf = dictionary_get(host->functions, buffer); - if(*rdcf) break; + if(host->functions) { + while (!(*rdcf) && buffer[0]) { + *rdcf = dictionary_get(host->functions, buffer); + if (*rdcf) break; - // if s == NULL, set it to the end of the buffer - // this should happen only the first time - if(unlikely(!s)) - s = &buffer[key_length - 1]; + // if s == NULL, set it to the end of the buffer + // this should happen only the first time + if (unlikely(!s)) + s = &buffer[key_length - 1]; - // skip a word from the end - while(s >= buffer && !isspace(*s)) *s-- = '\0'; + // skip a word from the end + while (s >= buffer && !isspace(*s)) *s-- = '\0'; - // skip all spaces - while(s >= buffer && isspace(*s)) *s-- = '\0'; + // skip all spaces + while (s >= buffer && isspace(*s)) *s-- = '\0'; + } } buffer_flush(wb); diff --git a/libnetdata/required_dummies.h b/libnetdata/required_dummies.h index 5a0d4e050215b5..1ffe1e9e53cc02 100644 --- a/libnetdata/required_dummies.h +++ b/libnetdata/required_dummies.h @@ -37,6 +37,7 @@ void rrdset_thread_rda_free(void){}; void sender_thread_buffer_free(void){}; void query_target_free(void){}; void service_exits(void){}; +void rrd_collector_finished(void){}; // required by get_system_cpus() char *netdata_configured_host_prefix = ""; diff --git a/libnetdata/threads/threads.c b/libnetdata/threads/threads.c index ae3c7106d94510..a26eba73416309 100644 --- a/libnetdata/threads/threads.c +++ b/libnetdata/threads/threads.c @@ -178,6 +178,7 @@ void rrdset_thread_rda_free(void); void sender_thread_buffer_free(void); void query_target_free(void); void service_exits(void); +void rrd_collector_finished(void); static void thread_cleanup(void *ptr) { if(netdata_thread != ptr) { @@ -188,6 +189,7 @@ static void thread_cleanup(void *ptr) { if(!(netdata_thread->options & NETDATA_THREAD_OPTION_DONT_LOG_CLEANUP)) netdata_log_info("thread with task id %d finished", gettid()); + rrd_collector_finished(); sender_thread_buffer_free(); rrdset_thread_rda_free(); query_target_free(); diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index 72c7c6786f9496..df7db6ed8fcb58 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -489,6 +489,12 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS); } + if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) { + BUFFER *wb = sender_start(host->sender); + rrd_functions_expose_global_rrdpush(host, wb); + sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA); + } + RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST); bool exposed_upstream = (rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED); bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED);