Skip to content

Commit

Permalink
filterx/filterx-scope: implement LogMessage change tracking in Filter…
Browse files Browse the repository at this point in the history
…XScope

Previously message-tied variables were managed in part within expr-variable
and in part within FilterXScope. Now with the message being available
in FilterXScope, we can delegate this in entirety to FilterXScope.

This also implements the validation of message-tied values, so if the
LogMessage changes independently from FilterXScope, we will notice that too
and consider the values of those variables stale.

Signed-off-by: Balazs Scheidler <balazs.scheidler@axoflow.com>
  • Loading branch information
bazsi committed Jan 24, 2025
1 parent e56a787 commit 8248c38
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 113 deletions.
87 changes: 34 additions & 53 deletions lib/filterx/expr-variable.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,31 +40,6 @@ typedef struct _FilterXVariableExpr
guint32 handle_is_macro:1;
} FilterXVariableExpr;

static FilterXObject *
_pull_variable_from_message(FilterXVariableExpr *self, FilterXEvalContext *context, LogMessage *msg)
{
gssize value_len;
LogMessageValueType t;
const gchar *value = log_msg_get_value_if_set_with_type(msg, self->handle, &value_len, &t);
if (!value)
{
filterx_eval_push_error("No such name-value pair in the log message", &self->super, self->variable_name);
return NULL;
}

if (self->handle_is_macro)
return filterx_message_value_new(value, value_len, t);
else
return filterx_message_value_new_borrowed(value, value_len, t);
}

/* NOTE: unset on a variable that only exists in the LogMessage, without making the message writable */
static void
_whiteout_variable(FilterXVariableExpr *self, FilterXEvalContext *context)
{
filterx_scope_register_variable(context->scope, FX_VAR_MESSAGE_TIED, self->handle, NULL);
}

static FilterXObject *
_eval_variable(FilterXExpr *s)
{
Expand All @@ -75,21 +50,20 @@ _eval_variable(FilterXExpr *s)
variable = filterx_scope_lookup_variable(context->scope, self->handle);
if (variable)
{
FilterXObject *value = filterx_variable_get_value(variable);
FilterXObject *value = filterx_scope_get_variable(context->scope, variable);
if (!value)
{
filterx_eval_push_error("Variable is unset", &self->super, self->variable_name);
}
return value;
}

if (filterx_variable_handle_is_message_tied(self->handle))
if (self->variable_type == FX_VAR_MESSAGE_TIED)
{
FilterXObject *msg_ref = _pull_variable_from_message(self, context, context->msg);
if(!msg_ref)
return NULL;
filterx_scope_register_variable(context->scope, FX_VAR_MESSAGE_TIED, self->handle, msg_ref);
return msg_ref;
/* auto register message tied variables */
variable = filterx_scope_register_variable(context->scope, self->variable_type, self->handle);
if (variable)
return filterx_variable_get_value(variable);
}

filterx_eval_push_error("No such variable", s, self->variable_name);
Expand All @@ -100,37 +74,34 @@ static void
_update_repr(FilterXExpr *s, FilterXObject *new_repr)
{
FilterXVariableExpr *self = (FilterXVariableExpr *) s;
FilterXScope *scope = filterx_eval_get_scope();
FilterXVariable *variable = filterx_scope_lookup_variable(scope, self->handle);
FilterXEvalContext *context = filterx_eval_get_context();
FilterXScope *scope = context->scope;

g_assert(variable != NULL);
filterx_variable_set_value(variable, new_repr, FALSE);
FilterXVariable *variable = filterx_scope_lookup_variable(scope, self->handle);
filterx_scope_set_variable(scope, variable, new_repr, FALSE);
}

static gboolean
_assign(FilterXExpr *s, FilterXObject *new_value)
{
FilterXVariableExpr *self = (FilterXVariableExpr *) s;
FilterXScope *scope = filterx_eval_get_scope();
FilterXVariable *variable = filterx_scope_lookup_variable(scope, self->handle);

if (self->handle_is_macro)
{
filterx_eval_push_error("Macro based variable cannot be changed", &self->super, self->variable_name);
return FALSE;
}

if (!variable)
{
/* NOTE: we pass NULL as initial_value to make sure the new variable
* is considered changed due to the assignment */
FilterXEvalContext *context = filterx_eval_get_context();
FilterXScope *scope = context->scope;
FilterXVariable *variable = filterx_scope_lookup_variable(scope, self->handle);

variable = filterx_scope_register_variable(scope, self->variable_type, self->handle, NULL);
}
if (!variable)
variable = filterx_scope_register_variable(scope, self->variable_type, self->handle);

/* this only clones mutable objects */
new_value = filterx_object_clone(new_value);
filterx_variable_set_value(variable, new_value, TRUE);
filterx_scope_set_variable(scope, variable, new_value, TRUE);
filterx_object_unref(new_value);
return TRUE;
}
Expand All @@ -139,15 +110,17 @@ static gboolean
_isset(FilterXExpr *s)
{
FilterXVariableExpr *self = (FilterXVariableExpr *) s;
FilterXScope *scope = filterx_eval_get_scope();
FilterXEvalContext *context = filterx_eval_get_context();
FilterXScope *scope = context->scope;
LogMessage *msg = context->msg;

FilterXVariable *variable = filterx_scope_lookup_variable(scope, self->handle);
if (variable)
return filterx_variable_is_set(variable);

FilterXEvalContext *context = filterx_eval_get_context();
LogMessage *msg = context->msg;
return log_msg_is_value_set(msg, self->handle);
if (self->variable_type == FX_VAR_MESSAGE_TIED)
return log_msg_is_value_set(msg, filterx_variable_handle_to_nv_handle(self->handle));
return FALSE;
}

static gboolean
Expand All @@ -162,17 +135,25 @@ _unset(FilterXExpr *s)
}

FilterXEvalContext *context = filterx_eval_get_context();
FilterXScope *scope = context->scope;
LogMessage *msg = context->msg;

FilterXVariable *variable = filterx_scope_lookup_variable(context->scope, self->handle);
if (variable)
{
filterx_variable_unset_value(variable);
filterx_scope_unset_variable(scope, variable);
return TRUE;
}

LogMessage *msg = context->msg;
if (log_msg_is_value_set(msg, self->handle))
_whiteout_variable(self, context);
if (self->variable_type == FX_VAR_MESSAGE_TIED)
{
if (log_msg_is_value_set(msg, self->handle))
{
FilterXVariable *v = filterx_scope_register_variable(context->scope, self->variable_type, self->handle);
/* make sure it is considered changed */
filterx_scope_unset_variable(context->scope, v);
}
}

return TRUE;
}
Expand Down
99 changes: 68 additions & 31 deletions lib/filterx/filterx-scope.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*
*/
#include "filterx/filterx-scope.h"
#include "filterx/object-message-value.h"
#include "scratch-buffers.h"


Expand Down Expand Up @@ -62,69 +63,97 @@ _lookup_variable(FilterXScope *self, FilterXVariableHandle handle, FilterXVariab
static gboolean
_validate_variable(FilterXScope *self, FilterXVariable *variable)
{
if (filterx_variable_is_declared(variable))
return TRUE;

if (filterx_variable_is_floating(variable) &&
!filterx_variable_is_declared(variable) &&
!filterx_variable_is_same_generation(variable, self->generation))
return FALSE;
return TRUE;
}

FilterXVariable *
filterx_scope_lookup_variable(FilterXScope *self, FilterXVariableHandle handle)
{
FilterXVariable *v;

if (_lookup_variable(self, handle, &v) && _validate_variable(self, v))
return v;
return NULL;
if (filterx_variable_is_message_tied(variable) &&
!filterx_variable_is_same_generation(variable, self->msg->generation))
return FALSE;
return TRUE;
}

static FilterXVariable *
_register_variable(FilterXScope *self,
FilterXVariableType variable_type,
FilterXVariableHandle handle,
FilterXObject *initial_value)
FilterXVariableHandle handle)
{
FilterXVariable *v_slot;

if (_lookup_variable(self, handle, &v_slot))
{
/* already present */
if (!filterx_variable_is_same_generation(v_slot, self->generation))
{
/* existing value is from a previous generation, override it as if
* it was a new value */

filterx_variable_set_generation(v_slot, self->generation);
filterx_variable_set_value(v_slot, initial_value, FALSE);
}
return v_slot;
}
/* turn v_slot into an index */
gsize v_index = ((guint8 *) v_slot - (guint8 *) self->variables->data) / sizeof(FilterXVariable);
g_assert(v_index <= self->variables->len);
g_assert(&g_array_index(self->variables, FilterXVariable, v_index) == v_slot);


/* we register an unset variable here first */
FilterXVariable v;
filterx_variable_init_instance(&v, variable_type, handle, initial_value, self->generation);
filterx_variable_init_instance(&v, variable_type, handle);
g_array_insert_val(self->variables, v_index, v);

return &g_array_index(self->variables, FilterXVariable, v_index);
}

FilterXVariable *
filterx_scope_lookup_variable(FilterXScope *self, FilterXVariableHandle handle)
{
FilterXVariable *v;

if (_lookup_variable(self, handle, &v) &&
_validate_variable(self, v))
return v;
return NULL;
}

static FilterXObject *
_pull_variable_from_message(FilterXScope *self, NVHandle handle)
{
gssize value_len;
LogMessageValueType t;

const gchar *value = log_msg_get_value_if_set_with_type(self->msg, handle, &value_len, &t);
if (!value)
return NULL;

if (log_msg_is_handle_macro(handle))
{
FilterXObject *res = filterx_message_value_new(value, value_len, t);
filterx_object_make_readonly(res);
return res;
}
else
return filterx_message_value_new_borrowed(value, value_len, t);
}


FilterXVariable *
filterx_scope_register_variable(FilterXScope *self,
FilterXVariableType variable_type,
FilterXVariableHandle handle,
FilterXObject *initial_value)
FilterXVariableHandle handle)
{
FilterXVariable *v = _register_variable(self, variable_type, handle, initial_value);
FilterXVariable *v = _register_variable(self, variable_type, handle);

/* the scope needs to be synced with the message if it holds a
* message-tied variable (e.g. $MSG) */
if (filterx_variable_handle_is_message_tied(handle))
self->syncable = TRUE;
if (variable_type == FX_VAR_MESSAGE_TIED)
{
FilterXObject *value = _pull_variable_from_message(self, filterx_variable_handle_to_nv_handle(handle));
self->syncable = TRUE;

/* NOTE: value may be NULL on an error, in that case the variable becomes an unset one */
filterx_variable_set_value(v, value, FALSE, self->msg->generation);
filterx_object_unref(value);
}
else
{
filterx_variable_set_value(v, NULL, FALSE, self->generation);
}
return v;
}

Expand Down Expand Up @@ -155,7 +184,7 @@ filterx_scope_foreach_variable(FilterXScope *self, FilterXScopeForeachFunc func,
void
filterx_scope_sync(FilterXScope *self, LogMessage *msg)
{

filterx_scope_set_message(self, msg);
if (!self->dirty)
{
msg_trace("Filterx sync: not syncing as scope is not dirty",
Expand All @@ -172,6 +201,8 @@ filterx_scope_sync(FilterXScope *self, LogMessage *msg)

GString *buffer = scratch_buffers_alloc();

gint msg_generation = msg->generation;

for (gint i = 0; i < self->variables->len; i++)
{
FilterXVariable *v = &g_array_index(self->variables, FilterXVariable, i);
Expand Down Expand Up @@ -199,16 +230,19 @@ filterx_scope_sync(FilterXScope *self, LogMessage *msg)
}
else if (v->value == NULL)
{
g_assert(v->generation == msg_generation);
msg_trace("Filterx sync: whiteout variable, unsetting in message",
evt_tag_str("variable", log_msg_get_value_name(filterx_variable_get_nv_handle(v), NULL)));
/* we need to unset */
log_msg_unset_value(msg, filterx_variable_get_nv_handle(v));
filterx_variable_unassign(v);
v->generation++;
}
else if (filterx_variable_is_assigned(v) || filterx_object_is_modified_in_place(v->value))
{
LogMessageValueType t;

g_assert(v->generation == msg_generation);
msg_trace("Filterx sync: changed variable in scope, overwriting in message",
evt_tag_str("variable", log_msg_get_value_name(filterx_variable_get_nv_handle(v), NULL)));

Expand All @@ -218,13 +252,17 @@ filterx_scope_sync(FilterXScope *self, LogMessage *msg)
log_msg_set_value_with_type(msg, filterx_variable_get_nv_handle(v), buffer->str, buffer->len, t);
filterx_object_set_modified_in_place(v->value, FALSE);
filterx_variable_unassign(v);
v->generation++;
}
else
{
msg_trace("Filterx sync: variable in scope and message in sync, not doing anything",
evt_tag_str("variable", log_msg_get_value_name(filterx_variable_get_nv_handle(v), NULL)));
v->generation++;
}
}
/* FIXME: hack ! */
msg->generation = msg_generation + 1;
self->dirty = FALSE;
}

Expand Down Expand Up @@ -298,7 +336,6 @@ filterx_scope_make_writable(FilterXScope **pself)
*pself = new;
}
(*pself)->generation++;
g_assert((*pself)->generation < FILTERX_SCOPE_MAX_GENERATION);
return *pself;
}

Expand Down
Loading

0 comments on commit 8248c38

Please sign in to comment.