Skip to content

Commit

Permalink
instrumentation: core, revision 3
Browse files Browse the repository at this point in the history
Signed-off-by: falamatt@amazon.com <falamatt@amazon.com>
  • Loading branch information
matthewfala committed Oct 21, 2022
1 parent 97a5e9d commit f8b9179
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 0 deletions.
68 changes: 68 additions & 0 deletions include/fluent-bit/flb_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
#include <inttypes.h>
#include <errno.h>

// Instrumentation start
#include <sys/time.h>
// Instrumentation end

/* FIXME: this extern should be auto-populated from flb_thread_storage.h */
extern FLB_TLS_DEFINE(struct flb_log, flb_log_ctx)

Expand Down Expand Up @@ -105,6 +109,70 @@ int flb_log_set_file(struct flb_config *config, char *out);
int flb_log_destroy(struct flb_log *log, struct flb_config *config);
void flb_log_print(int type, const char *file, int line, const char *fmt, ...);

/* Instrumentation patch */
void flb_log_single_event(const char* tag, const char* value);
void flb_log_recurring_event(const char* tag, const char* value);
void flb_log_recurring_event_prefixed(const char* tag_suffix, const char* value);

/* Instrumentation macros */
#define flb_log_load_counter(dest, name) \
if (flb_output_thread_instance_get()) { \
dest = ++flb_output_thread_instance_get()->th->name; } \
else { static int __FLB_LOG_LOAD_##name = 0; \
__FLB_LOG_LOAD_##name++; \
dest = __FLB_LOG_LOAD_##name; }

#define flb_log_update_counter(dest, name, update) \
if (flb_output_thread_instance_get()) { \
dest = flb_output_thread_instance_get()->th->name+=update; } \
else { static int __FLB_LOG_LOAD_##name = 0; \
__FLB_LOG_LOAD_##name+=update; \
dest = __FLB_LOG_LOAD_##name; }

#define __flb_log_recurring_TOKEN_PASTE(x, y) x##y
#define __flb_log_recurring_CAT(x,y) __flb_log_recurring_TOKEN_PASTE(x,y)
#define __flb_log_unique_variable(SYMBOL) \
__flb_log_recurring_CAT(SYMBOL, __LINE__)

#define flb_log_recurring_event_prefixed(tag_suffix, value) \
static char __flb_log_unique_variable(FLB_LOG_tag)[100]; \
struct flb_out_thread_instance *__flb_log_unique_variable(out) = \
flb_output_thread_instance_get(); \
if (!__flb_log_unique_variable(out)) { \
sprintf( __flb_log_unique_variable(FLB_LOG_tag), "flb_engine_%s", \
tag_suffix); \
} \
else { \
sprintf( __flb_log_unique_variable(FLB_LOG_tag), \
"%s_worker-%d_%s", \
__flb_log_unique_variable(out)->ins->name, \
__flb_log_unique_variable(out)->th->id, \
tag_suffix); \
} \
flb_log_recurring_event(__flb_log_unique_variable(FLB_LOG_tag), value);

#define flb_log_time_code_prefixed(tag_suffix, code) \
struct timeval time; \
gettimeofday(&time, NULL); \
int start_time = time.tv_sec * 1000 + time.tv_usec / 1000; \
code; \
char timedelta[50]; \
gettimeofday(&time, NULL); \
int end_time = time.tv_sec * 1000 + time.tv_usec / 1000; \
sprintf(timedelta, "%d", end_time - start_time); \
flb_log_recurring_event_prefixed(tag_suffix, timedelta);

#define flb_log_time_tick() \
struct timeval time; \
gettimeofday(&time, NULL); \
int start_time = time.tv_sec * 1000 + time.tv_usec / 1000;

#define flb_log_time_tock(tag_suffix) \
char timedelta[50]; \
gettimeofday(&time, NULL); \
int end_time = time.tv_sec * 1000 + time.tv_usec / 1000; \
sprintf(timedelta, "%d", end_time - start_time); \
flb_log_recurring_event_prefixed(tag_suffix, timedelta);

/* Logging macros */
#define flb_helper(fmt, ...) \
Expand Down
98 changes: 98 additions & 0 deletions src/flb_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -471,3 +471,101 @@ int flb_log_destroy(struct flb_log *log, struct flb_config *config)

return 0;
}

/* Instrumentation start */
static inline long flb_log_timestamp() {
struct timeval time;
gettimeofday(&time, NULL);
return time.tv_sec * 1000 + time.tv_usec / 1000;
}

char* folder_path;
char* single_event_file_name = "_metrics.txt";
static char session[100] = {0};
static char buffer[PATH_MAX];
static char session_path[PATH_MAX];
static char single_event_path[PATH_MAX] = {0};

static void recover_session() {
long milliseconds;
int cursor = 0;

/* get session */
if (session[0] == 0) {
folder_path = getenv("FLB_INSTRUMENTATION_OUT_PATH");
if (!folder_path) {
strcpy(buffer, getenv("HOME"));
cursor+=strlen(getenv("HOME"));
strcpy(buffer+cursor, "/instrumentation");
mkdir(buffer, 0755);
folder_path = buffer;
}
cursor = 0;
milliseconds = flb_log_timestamp();
sprintf(session, "%ld", milliseconds);
strcpy(session_path, folder_path);
cursor+=strlen(folder_path);
session_path[cursor++] = (char)'/';
strcpy(session_path+cursor, session);
session_path[cursor++] = (char)'/';
mkdir(session_path, 0755);
}
}

void flb_log_single_event(const char* tag, const char* value) {
FILE *file;
int cursor = 0;

/* get session */
recover_session();

/* get _metrics.txt path */
if (single_event_path[0] == 0) {
strcpy(single_event_path, session_path);
cursor += strlen(session_path);
single_event_path[cursor++] = '/';
strcpy(single_event_path+cursor, single_event_file_name);
}

/* check file exists */
if ((file = fopen(single_event_path, "a"))) {
fprintf(file, "%s: %s\n", tag, value);
fflush(file);
fclose(file);
}
else {
flb_error("[flb_log] could not open single event file");
}
}

void flb_log_recurring_event(const char* tag, const char* value) {
FILE *file;
char full_path[PATH_MAX];
long milliseconds;
int cursor = 0;

/* get session */
recover_session();

/* get full path */
cursor = 0;
strcpy(full_path, session_path);
cursor += strlen(session_path);
full_path[cursor++] = '/';
strcpy(full_path+cursor, tag);
cursor+=strlen(tag);
strcpy(full_path+cursor, ".csv");

/* check file exists */
if ((file = fopen(full_path, "a"))) {
milliseconds = flb_log_timestamp();
fprintf(file, "%ld, %s\n", milliseconds, value);
fflush(file);
fclose(file);
}
else {
flb_error("[flb_log] could not open recurring event file");
}
}

/* Instrumentation end */

0 comments on commit f8b9179

Please sign in to comment.