Skip to content

Commit

Permalink
Fixes #1597 - Add an option for non-structured vflow record ids and u…
Browse files Browse the repository at this point in the history
…se this option for the emitted site record.
  • Loading branch information
ted-ross committed Sep 19, 2024
1 parent 3b8cdd6 commit 61c0b1c
Showing 1 changed file with 92 additions and 24 deletions.
116 changes: 92 additions & 24 deletions src/vanflow.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,17 @@
#define RATE_SLOT_COUNT 5
#define IDENTITY_MAX 27
#define DEFERRED_DELETION_TICKS 25 // Five seconds
#define VFLOW_ID_CUSTOM 0xffffffffffffffff

//
// If the record_id value is VFLOW_ID_CUSTOM, use the full_id for arbitrary strings, otherwise use ${s.source_id}:${record_id}
//
typedef struct vflow_identity_t {
uint64_t record_id;
char source_id[ROUTER_ID_SIZE];
union {
char source_id[ROUTER_ID_SIZE];
char *full_id;
} s;
} vflow_identity_t;

typedef struct vflow_attribute_data_t {
Expand Down Expand Up @@ -273,15 +280,20 @@ static void _vflow_next_id(vflow_identity_t *identity)
{
sys_mutex_lock(&state->id_lock);
identity->record_id = state->next_identity++;
memcpy(identity->source_id, state->router_id, ROUTER_ID_SIZE);
memcpy(identity->s.source_id, state->router_id, ROUTER_ID_SIZE);
sys_mutex_unlock(&state->id_lock);
}


static char *_vflow_id_to_new_string(const vflow_identity_t *identity)
{
char *result = (char*) malloc(IDENTITY_MAX);
snprintf(result, IDENTITY_MAX, "%s:%"PRIu64, identity->source_id, identity->record_id);
char *result;
if (identity->record_id == VFLOW_ID_CUSTOM) {
result = qd_strdup(identity->s.full_id);
} else {
result = (char*) malloc(IDENTITY_MAX);
snprintf(result, IDENTITY_MAX, "%s:%"PRIu64, identity->s.source_id, identity->record_id);
}
return result;
}

Expand All @@ -295,9 +307,13 @@ static char *_vflow_id_to_new_string(const vflow_identity_t *identity)
*/
static void _vflow_strncat_id(char *buffer, size_t n, const vflow_identity_t *id)
{
char text[IDENTITY_MAX + 1];
snprintf(text, IDENTITY_MAX, "%s:%"PRIu64, id->source_id, id->record_id);
strncat(buffer, text, n);
if (id->record_id == VFLOW_ID_CUSTOM) {
strcat(buffer, id->s.full_id);
} else {
char text[IDENTITY_MAX + 1];
snprintf(text, IDENTITY_MAX, "%s:%"PRIu64, id->s.source_id, id->record_id);
strncat(buffer, text, n);
}
}


Expand All @@ -312,8 +328,8 @@ static void _vflow_strncat_id(char *buffer, size_t n, const vflow_identity_t *id
static bool _vflow_parse_id_iter(vflow_identity_t *identity, qd_iterator_t *iter)
{
identity->record_id = 0;
size_t source_size = qd_iterator_ncopy(iter, (uint8_t*) identity->source_id, ROUTER_ID_SIZE - 1);
identity->source_id[source_size] = '\0';
size_t source_size = qd_iterator_ncopy(iter, (uint8_t*) identity->s.source_id, ROUTER_ID_SIZE - 1);
identity->s.source_id[source_size] = '\0';
if (source_size != ROUTER_ID_SIZE - 1) {
return false;
}
Expand All @@ -336,7 +352,7 @@ static bool _vflow_parse_id_iter(vflow_identity_t *identity, qd_iterator_t *iter
identity->record_id = (identity->record_id * 10) + (digit - '0');
}

return true;
return identity->record_id != VFLOW_ID_CUSTOM;
}


Expand Down Expand Up @@ -533,7 +549,7 @@ static void _vflow_start_record_TH(vflow_work_t *work, bool discard)
// Check to see if this co-record's base record is from the same source. If so,
// we must find the base record and link it to the co-record for local processing.
//
if (strncmp(record->identity.source_id, state->router_id, ROUTER_ID_SIZE) == 0) {
if (strncmp(record->identity.s.source_id, state->router_id, ROUTER_ID_SIZE) == 0) {
vflow_record_t *base_record = _vflow_find_biflow_TH(record->identity.record_id);
if (!!base_record) {
base_record->co_record_peer = record;
Expand Down Expand Up @@ -917,6 +933,13 @@ static void _vflow_free_record_TH(vflow_record_t *record, bool recursive)
data = DEQ_HEAD(record->attributes);
}

//
// Free the full-id if appropriate
//
if (record->identity.record_id == VFLOW_ID_CUSTOM) {
free(record->identity.s.full_id);
}

//
// Free the record
//
Expand Down Expand Up @@ -1129,9 +1152,13 @@ static void _vflow_emit_unflushed_as_events_TH(qdr_core_t *core, vflow_record_li
qd_compose_insert_uint(field, record->record_type);

qd_compose_insert_uint(field, VFLOW_ATTRIBUTE_IDENTITY);
char identity[IDENTITY_MAX + 1];
snprintf(identity, IDENTITY_MAX, "%s:%"PRIu64, record->identity.source_id, record->identity.record_id);
qd_compose_insert_string(field, identity);
if (record->identity.record_id == VFLOW_ID_CUSTOM) {
qd_compose_insert_string(field, record->identity.s.full_id);
} else {
char identity[IDENTITY_MAX + 1];
snprintf(identity, IDENTITY_MAX, "%s:%"PRIu64, record->identity.s.source_id, record->identity.record_id);
qd_compose_insert_string(field, identity);
}

vflow_attribute_data_t *data = DEQ_HEAD(record->attributes);
while (data) {
Expand Down Expand Up @@ -1221,7 +1248,7 @@ static void _vflow_emit_co_records_TH(qdr_core_t *core, vflow_record_list_t *unf

while (!DEQ_IS_EMPTY(*unflushed_records)) {
vflow_record_t *start_record = DEQ_HEAD(*unflushed_records);
const char *base_source_id = start_record->identity.source_id;
const char *base_source_id = start_record->identity.s.source_id;

//
// Build a working list of unflushed co-records that belong to the same base source.
Expand All @@ -1231,7 +1258,7 @@ static void _vflow_emit_co_records_TH(qdr_core_t *core, vflow_record_list_t *unf
vflow_record_t *next_record;
do {
next_record = DEQ_NEXT_N(UNFLUSHED, record);
if (strncmp(base_source_id, record->identity.source_id, ROUTER_ID_SIZE) == 0) {
if (strncmp(base_source_id, record->identity.s.source_id, ROUTER_ID_SIZE) == 0) {
DEQ_REMOVE_N(UNFLUSHED, *unflushed_records, record);
DEQ_INSERT_TAIL_N(UNFLUSHED, working_list, record);
}
Expand All @@ -1247,7 +1274,7 @@ static void _vflow_emit_co_records_TH(qdr_core_t *core, vflow_record_list_t *unf
//
const size_t address_length = ROUTER_ID_SIZE + 7;
char base_source_address[address_length];
snprintf(base_source_address, address_length, "vfcr.%s:0", base_source_id);
snprintf(base_source_address, address_length, "%s%s:0", co_record_address_prefix, base_source_id);
_vflow_emit_unflushed_as_events_TH(core, &working_list, base_source_address);
}
_vflow_clean_unflushed_TH(&working_list);
Expand Down Expand Up @@ -1774,6 +1801,29 @@ static void _vflow_on_heartbeat(void *context)
}


vflow_record_t *vflow_start_record_custom_id(vflow_record_type_t record_type, vflow_record_t *parent, const char *id)
{
vflow_record_t *record = new_vflow_record_t();
vflow_work_t *work = _vflow_work(_vflow_start_record_TH);
ZERO(record);
record->record_type = record_type;
record->parent = parent;
record->flush_slot = -1;
record->never_logged = true;
record->force_log = false;
record->ended = false;

work->record = record;
work->value64 = _now_in_usec();

record->identity.record_id = VFLOW_ID_CUSTOM;
record->identity.s.full_id = qd_strdup(id);

_vflow_post_work(work);
return record;
}


//=====================================================================================
// Public Functions
//=====================================================================================
Expand Down Expand Up @@ -1853,8 +1903,12 @@ void vflow_serialize_identity(const vflow_record_t *record, qd_composed_field_t
char buffer[IDENTITY_MAX + 1];
assert(!!record);
if (!!record) {
snprintf(buffer, IDENTITY_MAX, "%s:%"PRIu64, record->identity.source_id, record->identity.record_id);
qd_compose_insert_string(field, buffer);
if (record->identity.record_id == VFLOW_ID_CUSTOM) {
qd_compose_insert_string(field, record->identity.s.full_id);
} else {
snprintf(buffer, IDENTITY_MAX, "%s:%"PRIu64, record->identity.s.source_id, record->identity.record_id);
qd_compose_insert_string(field, buffer);
}
}
}

Expand All @@ -1864,8 +1918,12 @@ void vflow_serialize_identity_pn(const vflow_record_t *record, pn_data_t *data)
char buffer[IDENTITY_MAX + 1];
assert(!!record);
if (!!record) {
snprintf(buffer, IDENTITY_MAX, "%s:%"PRIu64, record->identity.source_id, record->identity.record_id);
pn_data_put_string(data, pn_bytes(strlen(buffer), buffer));
if (record->identity.record_id == VFLOW_ID_CUSTOM) {
pn_data_put_string(data, pn_bytes(strlen(record->identity.s.full_id), record->identity.s.full_id));
} else {
snprintf(buffer, IDENTITY_MAX, "%s:%"PRIu64, record->identity.s.source_id, record->identity.record_id);
pn_data_put_string(data, pn_bytes(strlen(buffer), buffer));
}
}
}

Expand Down Expand Up @@ -2104,7 +2162,14 @@ QD_EXPORT qd_error_t qd_dispatch_configure_site(qd_dispatch_t *qd, qd_entity_t *
return qd_error_code();
}

vflow_record_t *site = vflow_start_record(VFLOW_RECORD_SITE, 0);
vflow_record_t *site;

if (!!state->site_id) {
site = vflow_start_record_custom_id(VFLOW_RECORD_SITE, 0, state->site_id);
} else {
site = vflow_start_record(VFLOW_RECORD_SITE, 0);
}

if (name) vflow_set_string(site, VFLOW_ATTRIBUTE_NAME, name);
if (location) vflow_set_string(site, VFLOW_ATTRIBUTE_LOCATION, location);
if (provider) vflow_set_string(site, VFLOW_ATTRIBUTE_PROVIDER, provider);
Expand Down Expand Up @@ -2184,11 +2249,14 @@ static void _vflow_process_co_record_TH(vflow_work_t *work, bool discard)
static void _vflow_on_co_record_map(qd_parsed_field_t *co_record)
{
vflow_record_type_t record_type = VFLOW_RECORD_SITE; // Any value other than BIFLOW_TPORT
vflow_identity_t identity = {0, ""};
vflow_identity_t identity;
vflow_attribute_data_list_t attributes;
uint32_t item_count = qd_parse_sub_count(co_record);
bool input_error = false;

identity.record_id = 0;
identity.s.source_id[0] = '\0';

DEQ_INIT(attributes);
for (uint32_t i = 0; i < item_count; i++) {
qd_parsed_field_t *key = qd_parse_sub_key(co_record, i);
Expand Down Expand Up @@ -2241,7 +2309,7 @@ static void _vflow_on_co_record_map(qd_parsed_field_t *co_record)
input_error = true;
}

if (strncmp(identity.source_id, state->router_id, ROUTER_ID_SIZE) != 0) {
if (strncmp(identity.s.source_id, state->router_id, ROUTER_ID_SIZE) != 0) {
input_error = true;
}

Expand Down

0 comments on commit 61c0b1c

Please sign in to comment.