Skip to content

Commit

Permalink
Functions: allow collectors to be restarted (netdata#15983)
Browse files Browse the repository at this point in the history
  • Loading branch information
ktsaou authored Sep 16, 2023
1 parent 638d9b0 commit 11de4e4
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 29 deletions.
2 changes: 2 additions & 0 deletions database/rrd.h
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,8 @@ typedef enum __attribute__ ((__packed__)) rrdhost_flags {

RRDHOST_FLAG_METADATA_CLAIMID = (1 << 28), // metadata needs to be stored in the database
RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED = (1 << 29), // set when the receiver part is disconnected

RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED = (1 << 30), // set when the host has updated global functions
} RRDHOST_FLAGS;

#define rrdhost_flag_check(host, flag) (__atomic_load_n(&((host)->flags), __ATOMIC_SEQ_CST) & (flag))
Expand Down
98 changes: 69 additions & 29 deletions database/rrdfunctions.c
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,11 @@ struct rrd_collector {
static __thread struct rrd_collector *thread_rrd_collector = NULL;

static void rrd_collector_free(struct rrd_collector *rdc) {
if(rdc->running)
return;

int32_t expected = 0;
if(likely(!__atomic_compare_exchange_n(&rdc->refcount, &expected, -1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))) {
if(!__atomic_compare_exchange_n(&rdc->refcount, &expected, -1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
// the collector is still referenced by charts.
// leave it hanging there, the last chart will actually free it.
return;
Expand All @@ -323,9 +326,9 @@ static void rrd_collector_free(struct rrd_collector *rdc) {

// called once per collector
void rrd_collector_started(void) {
if(likely(thread_rrd_collector)) return;
if(!thread_rrd_collector)
thread_rrd_collector = callocz(1, sizeof(struct rrd_collector));

thread_rrd_collector = callocz(1, sizeof(struct rrd_collector));
thread_rrd_collector->tid = gettid();
thread_rrd_collector->running = true;
}
Expand All @@ -341,43 +344,70 @@ void rrd_collector_finished(void) {
}

static struct rrd_collector *rrd_collector_acquire(void) {
__atomic_add_fetch(&thread_rrd_collector->refcount, 1, __ATOMIC_SEQ_CST);
rrd_collector_started();

int32_t expected = __atomic_load_n(&thread_rrd_collector->refcount, __ATOMIC_RELAXED), wanted = 0;
do {
if(expected < 0 || !thread_rrd_collector->running) {
internal_fatal(true, "FUNCTIONS: Trying to acquire a collector that is exiting.");
return thread_rrd_collector;
}

wanted = expected + 1;

} while(!__atomic_compare_exchange_n(&thread_rrd_collector->refcount, &expected, wanted, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED));

return thread_rrd_collector;
}

static void rrd_collector_release(struct rrd_collector *rdc) {
if(unlikely(!rdc)) return;

int32_t refcount = __atomic_sub_fetch(&rdc->refcount, 1, __ATOMIC_SEQ_CST);
if(refcount == 0 && !rdc->running)
int32_t expected = __atomic_load_n(&rdc->refcount, __ATOMIC_RELAXED), wanted = 0;
do {
if(expected < 0) {
internal_fatal(true, "FUNCTIONS: Trying to release a collector that is exiting.");
return;
}

if(expected == 0) {
internal_fatal(true, "FUNCTIONS: Trying to release a collector that is not acquired.");
return;
}

wanted = expected - 1;

} while(!__atomic_compare_exchange_n(&rdc->refcount, &expected, wanted, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED));

if(wanted == 0)
rrd_collector_free(rdc);
}

static void rrd_functions_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused,
void *rrdhost __maybe_unused) {
static void rrd_functions_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *rrdhost) {
RRDHOST *host = rrdhost; (void)host;
struct rrd_collector_function *rdcf = func;

if(!thread_rrd_collector)
fatal("RRDSET_COLLECTOR: called %s() for function '%s' without calling rrd_collector_started() first.",
__FUNCTION__, dictionary_acquired_item_name(item));

rrd_collector_started();
rdcf->collector = rrd_collector_acquire();

// internal_error(true, "FUNCTIONS: adding function '%s' on host '%s', collection tid %d, %s",
// dictionary_acquired_item_name(item), rrdhost_hostname(host),
// rdcf->collector->tid, rdcf->collector->running ? "running" : "NOT running");
}

static void rrd_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused,
static void rrd_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func,
void *rrdhost __maybe_unused) {
struct rrd_collector_function *rdcf = func;
rrd_collector_release(rdcf->collector);
}

static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused,
void *new_func __maybe_unused, void *rrdhost __maybe_unused) {
static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func,
void *new_func, void *rrdhost) {
RRDHOST *host = rrdhost; (void)host;
struct rrd_collector_function *rdcf = func;
struct rrd_collector_function *new_rdcf = new_func;

if(!thread_rrd_collector)
fatal("RRDSET_COLLECTOR: called %s() for function '%s' without calling rrd_collector_started() first.",
__FUNCTION__, dictionary_acquired_item_name(item));
rrd_collector_started();

bool changed = false;

Expand Down Expand Up @@ -417,6 +447,10 @@ static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_
changed = true;
}

// internal_error(true, "FUNCTIONS: adding function '%s' on host '%s', collection tid %d, %s",
// dictionary_acquired_item_name(item), rrdhost_hostname(host),
// rdcf->collector->tid, rdcf->collector->running ? "running" : "NOT running");

return changed;
}

Expand Down Expand Up @@ -460,6 +494,8 @@ void rrd_collector_add_function(RRDHOST *host, RRDSET *st, const char *name, int

if(st)
dictionary_view_set(st->functions_view, key, item);
else
rrdhost_flag_set(host, RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED);

dictionary_acquired_item_release(host->functions, item);
}
Expand All @@ -481,6 +517,8 @@ void rrd_functions_expose_rrdpush(RRDSET *st, BUFFER *wb) {
}

void rrd_functions_expose_global_rrdpush(RRDHOST *host, BUFFER *wb) {
rrdhost_flag_clear(host, RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED);

struct rrd_collector_function *tmp;
dfe_start_read(host->functions, tmp) {
if(!(tmp->options & RRD_FUNCTION_GLOBAL))
Expand Down Expand Up @@ -565,20 +603,22 @@ static int rrd_call_function_find(RRDHOST *host, BUFFER *wb, const char *name, s
char *s = NULL;

*rdcf = NULL;
while(!(*rdcf) && buffer[0]) {
*rdcf = dictionary_get(host->functions, buffer);
if(*rdcf) break;
if(host->functions) {
while (!(*rdcf) && buffer[0]) {
*rdcf = dictionary_get(host->functions, buffer);
if (*rdcf) break;

// if s == NULL, set it to the end of the buffer
// this should happen only the first time
if(unlikely(!s))
s = &buffer[key_length - 1];
// if s == NULL, set it to the end of the buffer
// this should happen only the first time
if (unlikely(!s))
s = &buffer[key_length - 1];

// skip a word from the end
while(s >= buffer && !isspace(*s)) *s-- = '\0';
// skip a word from the end
while (s >= buffer && !isspace(*s)) *s-- = '\0';

// skip all spaces
while(s >= buffer && isspace(*s)) *s-- = '\0';
// skip all spaces
while (s >= buffer && isspace(*s)) *s-- = '\0';
}
}

buffer_flush(wb);
Expand Down
1 change: 1 addition & 0 deletions libnetdata/required_dummies.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ void rrdset_thread_rda_free(void){};
void sender_thread_buffer_free(void){};
void query_target_free(void){};
void service_exits(void){};
void rrd_collector_finished(void){};

// required by get_system_cpus()
char *netdata_configured_host_prefix = "";
Expand Down
2 changes: 2 additions & 0 deletions libnetdata/threads/threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ void rrdset_thread_rda_free(void);
void sender_thread_buffer_free(void);
void query_target_free(void);
void service_exits(void);
void rrd_collector_finished(void);

static void thread_cleanup(void *ptr) {
if(netdata_thread != ptr) {
Expand All @@ -188,6 +189,7 @@ static void thread_cleanup(void *ptr) {
if(!(netdata_thread->options & NETDATA_THREAD_OPTION_DONT_LOG_CLEANUP))
netdata_log_info("thread with task id %d finished", gettid());

rrd_collector_finished();
sender_thread_buffer_free();
rrdset_thread_rda_free();
query_target_free();
Expand Down
6 changes: 6 additions & 0 deletions streaming/rrdpush.c
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,12 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
}

if(unlikely(host_flags & RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED)) {
BUFFER *wb = sender_start(host->sender);
rrd_functions_expose_global_rrdpush(host, wb);
sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_METADATA);
}

RRDSET_FLAGS rrdset_flags = __atomic_load_n(&st->flags, __ATOMIC_SEQ_CST);
bool exposed_upstream = (rrdset_flags & RRDSET_FLAG_UPSTREAM_EXPOSED);
bool replication_in_progress = !(rrdset_flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
Expand Down

0 comments on commit 11de4e4

Please sign in to comment.