diff --git a/lib/filter/filter-call.c b/lib/filter/filter-call.c index d309fed9ca..a2d9b8915d 100644 --- a/lib/filter/filter-call.c +++ b/lib/filter/filter-call.c @@ -50,9 +50,10 @@ filter_call_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemplate else stats_counter_inc(self->super.not_matched); + msg_set_context(msgs[num_msg - 1]); msg_trace("filter() evaluation started", - evt_tag_str("called-rule", self->rule), - evt_tag_msg_reference(msgs[num_msg - 1])); + evt_tag_str("called-rule", self->rule)); + msg_set_context(NULL); return res ^ s->comp; } diff --git a/lib/filter/filter-cmp.c b/lib/filter/filter-cmp.c index bc2670e74e..dd61c18d74 100644 --- a/lib/filter/filter-cmp.c +++ b/lib/filter/filter-cmp.c @@ -295,6 +295,7 @@ fop_cmp_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemplateEval else g_assert_not_reached(); + msg_set_context(msgs[num_msg - 1]); msg_trace("cmp() evaluation result", evt_tag_str("left", left_buf->str), evt_tag_str("operator", self->super.type), @@ -302,8 +303,8 @@ fop_cmp_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemplateEval evt_tag_str("compare_mode", _compare_mode_to_string(self->compare_mode)), evt_tag_str("left_type", log_msg_value_type_to_str(left_type)), evt_tag_str("right_type", log_msg_value_type_to_str(right_type)), - evt_tag_int("result", result), - evt_tag_msg_reference(msgs[num_msg - 1])); + evt_tag_int("result", result)); + msg_set_context(NULL); scratch_buffers_reclaim_marked(marker); return result ^ s->comp; diff --git a/lib/filter/filter-in-list.c b/lib/filter/filter-in-list.c index 59edb2d832..2daaf03da8 100644 --- a/lib/filter/filter-in-list.c +++ b/lib/filter/filter-in-list.c @@ -50,9 +50,10 @@ filter_in_list_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTempl APPEND_ZERO(value, value, len); gboolean result = (g_tree_lookup(self->tree, value) != NULL); + msg_set_context(msg); msg_trace("in-list() evaluation started", - evt_tag_str("value", value), - evt_tag_msg_reference(msg)); + evt_tag_str("value", value)); + msg_set_context(NULL); return result ^ s->comp; } diff --git a/lib/filter/filter-netmask.c b/lib/filter/filter-netmask.c index 1d42f908e4..51bf2bbaaf 100644 --- a/lib/filter/filter-netmask.c +++ b/lib/filter/filter-netmask.c @@ -63,11 +63,12 @@ filter_netmask_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTempl else res = FALSE; + msg_set_context(msg); msg_trace("netmask() evaluation started", evt_tag_inaddr("msg_address", addr), evt_tag_inaddr("address", &self->address), - evt_tag_inaddr("netmask", &self->netmask), - evt_tag_msg_reference(msg)); + evt_tag_inaddr("netmask", &self->netmask)); + msg_set_context(NULL); return res ^ s->comp; } diff --git a/lib/filter/filter-netmask6.c b/lib/filter/filter-netmask6.c index 3d5523d965..baf7a723d8 100644 --- a/lib/filter/filter-netmask6.c +++ b/lib/filter/filter-netmask6.c @@ -128,11 +128,12 @@ _eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemplateEvalOptions else result = FALSE; + msg_set_context(msg); msg_trace("netmask6() evaluation started", evt_tag_inaddr6("msg_address", address), evt_tag_inaddr6("address", &self->address), - evt_tag_int("prefix", self->prefix), - evt_tag_msg_reference(msg)); + evt_tag_int("prefix", self->prefix)); + msg_set_context(NULL); return result ^ s->comp; } diff --git a/lib/filter/filter-pipe.c b/lib/filter/filter-pipe.c index 22e7b06c80..c21746caec 100644 --- a/lib/filter/filter-pipe.c +++ b/lib/filter/filter-pipe.c @@ -59,18 +59,18 @@ log_filter_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_op LogFilterPipe *self = (LogFilterPipe *) s; gboolean res; + msg_set_context(msg); msg_trace(">>>>>> filter rule evaluation begin", evt_tag_str("rule", self->name), - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); res = filter_expr_eval_root(self->expr, &msg, path_options); + msg_set_context(msg); msg_trace("<<<<<< filter rule evaluation result", evt_tag_str("result", res ? "matched" : "unmatched"), evt_tag_str("rule", self->name), - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); if (res) { @@ -84,6 +84,7 @@ log_filter_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_op log_msg_drop(msg, path_options, AT_PROCESSED); stats_counter_inc(self->not_matched); } + msg_set_context(NULL); } static LogPipe * diff --git a/lib/filter/filter-pri.c b/lib/filter/filter-pri.c index a8c759e6a9..216fbba6b2 100644 --- a/lib/filter/filter-pri.c +++ b/lib/filter/filter-pri.c @@ -49,11 +49,12 @@ filter_facility_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemp { res = !!(self->valid & (1 << fac_num)); } + msg_set_context(msg); msg_trace("facility() evaluation started", evt_tag_int("fac", fac_num), - evt_tag_printf("valid_fac", "%08x", self->valid), - evt_tag_msg_reference(msg)); + evt_tag_printf("valid_fac", "%08x", self->valid)); + msg_set_context(NULL); return res ^ s->comp; } @@ -79,11 +80,12 @@ filter_severity_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemp res = !!((1 << pri) & self->valid); + msg_set_context(msg); msg_trace("severity() evaluation started", evt_tag_int("pri", pri), - evt_tag_printf("valid_pri", "%08x", self->valid), - evt_tag_msg_reference(msg)); + evt_tag_printf("valid_pri", "%08x", self->valid)); + msg_set_context(NULL); return res ^ s->comp; } diff --git a/lib/filter/filter-re.c b/lib/filter/filter-re.c index b340c0a57b..da0b2e812b 100644 --- a/lib/filter/filter-re.c +++ b/lib/filter/filter-re.c @@ -43,12 +43,13 @@ filter_re_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemplateEv LogMessage *msg = msgs[num_msg - 1]; gboolean result; + msg_set_context(msg); msg_trace("match() evaluation started against a name-value pair", evt_tag_msg_value_name("name", self->value_handle), evt_tag_msg_value("value", msg, self->value_handle), - evt_tag_str("pattern", self->matcher->pattern), - evt_tag_msg_reference(msg)); + evt_tag_str("pattern", self->matcher->pattern)); result = log_matcher_match_value(self->matcher, msg, self->value_handle); + msg_set_context(NULL); return result ^ s->comp; } @@ -178,12 +179,13 @@ filter_match_eval_against_program_pid_msg(FilterExprNode *s, LogMessage **msgs, pid_len > 0 ? "]" : "", log_msg_get_value(msg, LM_V_MESSAGE, NULL)); + msg_set_context(msg); msg_trace("match() evaluation started against constructed $PROGRAM[$PID]: $MESSAGE string for compatibility", evt_tag_printf("input", "%s", str), - evt_tag_str("pattern", self->super.matcher->pattern), - evt_tag_msg_reference(msg)); + evt_tag_str("pattern", self->super.matcher->pattern)); result = log_matcher_match_buffer(self->super.matcher, msg, str, -1); + msg_set_context(NULL); g_free(str); return result ^ s->comp; @@ -195,12 +197,13 @@ filter_match_eval_against_template(FilterExprNode *s, LogMessage **msgs, gint nu FilterMatch *self = (FilterMatch *) s; LogMessage *msg = msgs[num_msg - 1]; + msg_set_context(msg); msg_trace("match() evaluation started against template", evt_tag_template("input", self->template, msg, options), evt_tag_str("pattern", self->super.matcher->pattern), - evt_tag_str("template", self->template->template_str), - evt_tag_msg_reference(msg)); + evt_tag_str("template", self->template->template_str)); gboolean result = log_matcher_match_template(self->super.matcher, msg, self->template, options); + msg_set_context(NULL); return result ^ s->comp; } diff --git a/lib/filter/filter-tags.c b/lib/filter/filter-tags.c index 79d1d6b9ca..b31136e751 100644 --- a/lib/filter/filter-tags.c +++ b/lib/filter/filter-tags.c @@ -37,6 +37,7 @@ filter_tags_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemplate { FilterTags *self = (FilterTags *)s; LogMessage *msg = msgs[num_msg - 1]; + msg_set_context(msg); gboolean res; gint i; @@ -46,8 +47,7 @@ filter_tags_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemplate if (log_msg_is_tag_by_id(msg, tag_id)) { msg_trace("tags() evaluation result, matching tag is found", - evt_tag_str("tag", log_tags_get_by_id(tag_id)), - evt_tag_msg_reference(msg)); + evt_tag_str("tag", log_tags_get_by_id(tag_id))); res = TRUE; return res ^ s->comp; @@ -56,13 +56,12 @@ filter_tags_eval(FilterExprNode *s, LogMessage **msgs, gint num_msg, LogTemplate { msg_trace("tags() evaluation progress, tag is not set", evt_tag_str("tag", log_tags_get_by_id(tag_id)), - evt_tag_int("value", log_msg_is_tag_by_id(msg, tag_id)), - evt_tag_msg_reference(msg)); + evt_tag_int("value", log_msg_is_tag_by_id(msg, tag_id))); } } - msg_trace("tags() evaluation result, none of the tags is present", - evt_tag_msg_reference(msg)); + msg_trace("tags() evaluation result, none of the tags is present"); + msg_set_context(NULL); res = FALSE; return res ^ s->comp; } diff --git a/lib/filterx/filterx-pipe.c b/lib/filterx/filterx-pipe.c index cc4ccea9eb..6118b5a06a 100644 --- a/lib/filterx/filterx-pipe.c +++ b/lib/filterx/filterx-pipe.c @@ -59,20 +59,20 @@ log_filterx_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_o if (filterx_scope_has_log_msg_changes(eval_context.scope)) filterx_scope_invalidate_log_msg_cache(eval_context.scope); + msg_set_context(msg); msg_trace(">>>>>> filterx rule evaluation begin", evt_tag_str("rule", self->name), - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); NVTable *payload = nv_table_ref(msg->payload); eval_res = filterx_eval_exec(&eval_context, self->block, msg); + msg_set_context(msg); msg_trace("<<<<<< filterx rule evaluation result", filterx_format_eval_result(eval_res), evt_tag_str("rule", self->name), log_pipe_location_tag(s), - evt_tag_int("dirty", filterx_scope_is_dirty(eval_context.scope)), - evt_tag_msg_reference(msg)); + evt_tag_int("dirty", filterx_scope_is_dirty(eval_context.scope))); local_path_options.filterx_context = &eval_context; switch (eval_res) @@ -93,6 +93,7 @@ log_filterx_pipe_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_o g_assert_not_reached(); break; } + msg_set_context(NULL); filterx_eval_deinit_context(&eval_context); nv_table_unref(payload); diff --git a/lib/logmsg/logmsg.c b/lib/logmsg/logmsg.c index 67fce48f2c..b4538ac7d0 100644 --- a/lib/logmsg/logmsg.c +++ b/lib/logmsg/logmsg.c @@ -618,6 +618,7 @@ log_msg_set_value_with_type(LogMessage *self, NVHandle handle, gboolean new_entry = FALSE; g_assert(!log_msg_is_write_protected(self)); + msg_set_context(self); if (handle == LM_V_NONE) return; @@ -633,8 +634,7 @@ log_msg_set_value_with_type(LogMessage *self, NVHandle handle, msg_trace("Setting value", evt_tag_str("name", name), evt_tag_mem("value", value, value_len), - evt_tag_str("type", log_msg_value_type_to_str(type)), - evt_tag_msg_reference(self)); + evt_tag_str("type", log_msg_value_type_to_str(type))); } if (!log_msg_chk_flag(self, LF_STATE_OWN_PAYLOAD)) @@ -673,6 +673,7 @@ log_msg_set_value_with_type(LogMessage *self, NVHandle handle, if (_value_invalidates_legacy_header(handle)) log_msg_unset_value(self, LM_V_LEGACY_MSGHDR); + msg_set_context(NULL); } void @@ -685,12 +686,12 @@ void log_msg_unset_value(LogMessage *self, NVHandle handle) { g_assert(!log_msg_is_write_protected(self)); + msg_set_context(self); if (_log_name_value_updates(self)) { msg_trace("Unsetting value", - evt_tag_str("name", log_msg_get_value_name(handle, NULL)), - evt_tag_msg_reference(self)); + evt_tag_str("name", log_msg_get_value_name(handle, NULL))); } if (!log_msg_chk_flag(self, LF_STATE_OWN_PAYLOAD)) @@ -720,6 +721,7 @@ log_msg_unset_value(LogMessage *self, NVHandle handle) if (_value_invalidates_legacy_header(handle)) log_msg_unset_value(self, LM_V_LEGACY_MSGHDR); + msg_set_context(NULL); } void @@ -738,6 +740,7 @@ log_msg_set_value_indirect_with_type(LogMessage *self, NVHandle handle, gboolean new_entry = FALSE; g_assert(!log_msg_is_write_protected(self)); + msg_set_context(self); if (handle == LM_V_NONE) return; @@ -754,8 +757,7 @@ log_msg_set_value_indirect_with_type(LogMessage *self, NVHandle handle, evt_tag_str("type", log_msg_value_type_to_str(type)), evt_tag_int("ref_handle", ref_handle), evt_tag_int("ofs", ofs), - evt_tag_int("len", len), - evt_tag_msg_reference(self)); + evt_tag_int("len", len)); } if (!log_msg_chk_flag(self, LF_STATE_OWN_PAYLOAD)) @@ -788,6 +790,8 @@ log_msg_set_value_indirect_with_type(LogMessage *self, NVHandle handle, if (new_entry) log_msg_update_sdata(self, handle, name, name_len); log_msg_update_num_matches(self, handle); + + msg_set_context(NULL); } void @@ -1509,9 +1513,9 @@ log_msg_clone_cow(LogMessage *msg, const LogPathOptions *path_options) memcpy(self, msg, sizeof(*msg)); msg->allocated_bytes = allocated_bytes; + msg_set_context(msg); msg_trace("Message was cloned", - evt_tag_printf("original_msg", "%p", msg), - evt_tag_msg_reference(self)); + evt_tag_printf("original_msg", "%p", msg)); /* every field _must_ be initialized explicitly if its direct * copying would cause problems (like copying a pointer by value) */ @@ -1537,6 +1541,8 @@ log_msg_clone_cow(LogMessage *msg, const LogPathOptions *path_options) if (self->num_tags == 0) self->flags |= LF_STATE_OWN_TAGS; + + msg_set_context(NULL); return self; } diff --git a/lib/logmsg/logmsg.h b/lib/logmsg/logmsg.h index c513eb43fd..76593b78b0 100644 --- a/lib/logmsg/logmsg.h +++ b/lib/logmsg/logmsg.h @@ -552,17 +552,12 @@ void log_msg_registry_init(void); void log_msg_registry_deinit(void); void log_msg_global_init(void); void log_msg_global_deinit(void); -void log_msg_stats_global_init(void); void log_msg_registry_foreach(GHFunc func, gpointer user_data); gint log_msg_lookup_time_stamp_name(const gchar *name); gssize log_msg_get_size(LogMessage *self); -#define evt_tag_msg_reference(msg) \ - evt_tag_printf("msg", "%p", (msg)), \ - evt_tag_printf("rcptid", "%" G_GUINT64_FORMAT, (msg)->rcptid) - static inline EVTTAG * evt_tag_msg_value(const gchar *name, LogMessage *msg, NVHandle value_handle) { diff --git a/lib/logreader.c b/lib/logreader.c index 35fd7de285..139c04485a 100644 --- a/lib/logreader.c +++ b/lib/logreader.c @@ -459,9 +459,9 @@ log_reader_handle_line(LogReader *self, const guchar *line, gint length, LogTran LogMessage *m; m = msg_format_construct_message(&self->options->parse_options, line, length); + msg_set_context(m); msg_debug("Incoming log entry", - evt_tag_mem("input", line, length), - evt_tag_msg_reference(m)); + evt_tag_mem("input", line, length)); msg_format_parse_into(&self->options->parse_options, m, line, length); @@ -487,6 +487,7 @@ log_reader_handle_line(LogReader *self, const guchar *line, gint length, LogTran log_transport_aux_data_foreach(aux, _add_aux_nvpair, m); log_source_post(&self->super, m); + msg_set_context(NULL); log_msg_refcache_stop(); return log_source_free_to_send(&self->super); } diff --git a/lib/logsource.c b/lib/logsource.c index 69116e3fe8..908998d408 100644 --- a/lib/logsource.c +++ b/lib/logsource.c @@ -636,8 +636,7 @@ log_source_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options msg_set_context(msg); msg_diagnostics(">>>>>> Source side message processing begin", - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); /* $HOST setup */ log_source_mangle_hostname(self, msg); diff --git a/lib/logthrsource/logthrsourcedrv.c b/lib/logthrsource/logthrsourcedrv.c index e495fccf78..890ea0cc3d 100644 --- a/lib/logthrsource/logthrsourcedrv.c +++ b/lib/logthrsource/logthrsourcedrv.c @@ -408,13 +408,14 @@ log_threaded_source_worker_close_batch(LogThreadedSourceWorker *self) void log_threaded_source_worker_post(LogThreadedSourceWorker *self, LogMessage *msg) { + msg_set_context(msg); msg_debug("Incoming log message", evt_tag_str("input", log_msg_get_value(msg, LM_V_MESSAGE, NULL)), evt_tag_str("driver", self->control->super.super.id), - evt_tag_int("worker_index", self->worker_index), - evt_tag_msg_reference(msg)); + evt_tag_int("worker_index", self->worker_index)); _apply_message_attributes(self->control, msg); log_source_post(&self->super, msg); + msg_set_context(NULL); if (self->control->auto_close_batches) log_threaded_source_worker_close_batch(self); diff --git a/lib/messages.c b/lib/messages.c index 2134f7b64e..84d58df0e0 100644 --- a/lib/messages.c +++ b/lib/messages.c @@ -50,6 +50,7 @@ typedef struct _MsgContext guint16 recurse_state; guint recurse_warning:1; gchar recurse_trigger[128]; + guint64 original_msg_rcptid; } MsgContext; static gint active_log_level = -1; @@ -84,6 +85,8 @@ msg_set_context(LogMessage *msg) { MsgContext *context = msg_get_context(); + context->original_msg_rcptid = msg ? msg->rcptid : 0; + if (msg && (msg->flags & LF_INTERNAL)) { if (msg->recursed) @@ -240,6 +243,12 @@ msg_event_create(gint prio, const gchar *desc, EVTTAG *tag1, ...) evt_rec_add_tagsv(e, va); va_end(va); } + MsgContext *msg_context = msg_get_context(); + if (msg_context->original_msg_rcptid != 0) + { + EVTTAG *rcptid_tag = evt_tag_printf("rcptid", "%" G_GUINT64_FORMAT, msg_context->original_msg_rcptid); + evt_rec_add_tag(e, rcptid_tag); + } g_mutex_unlock(&evtlog_lock); return e; } diff --git a/lib/parser/parser-expr.c b/lib/parser/parser-expr.c index 5dc3926ffb..0c59385b69 100644 --- a/lib/parser/parser-expr.c +++ b/lib/parser/parser-expr.c @@ -91,18 +91,18 @@ log_parser_queue_method(LogPipe *s, LogMessage *msg, const LogPathOptions *path_ LogParser *self = (LogParser *) s; gboolean success; + msg_set_context(msg); msg_trace(">>>>>> parser rule evaluation begin", evt_tag_str("rule", self->name), - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); success = log_parser_process_message(self, &msg, path_options); + msg_set_context(msg); msg_trace("<<<<<< parser rule evaluation result", evt_tag_str("result", success ? "accepted" : "rejected"), evt_tag_str("rule", self->name), - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); if (success) { @@ -114,6 +114,7 @@ log_parser_queue_method(LogPipe *s, LogMessage *msg, const LogPathOptions *path_ (*path_options->matched) = FALSE; log_msg_drop(msg, path_options, AT_PROCESSED); } + msg_set_context(NULL); } static void diff --git a/lib/rewrite/rewrite-expr.c b/lib/rewrite/rewrite-expr.c index aa28988cb4..46a006b6db 100644 --- a/lib/rewrite/rewrite-expr.c +++ b/lib/rewrite/rewrite-expr.c @@ -36,28 +36,29 @@ static void log_rewrite_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options) { LogRewrite *self = (LogRewrite *) s; + msg_set_context(msg); msg_trace(">>>>>> rewrite rule evaluation begin", evt_tag_str("rule", self->name), - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); if (self->condition && !filter_expr_eval_root(self->condition, &msg, path_options)) { + msg_set_context(msg); msg_trace("Rewrite condition unmatched, skipping rewrite", evt_tag_str("value", log_msg_get_value_name(self->value_handle, NULL)), evt_tag_str("rule", self->name), - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); } else { self->process(self, &msg, path_options); } + msg_set_context(msg); msg_trace("<<<<<< rewrite rule evaluation finished", evt_tag_str("rule", self->name), - log_pipe_location_tag(s), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(s)); log_pipe_forward_msg(s, msg, path_options); + msg_set_context(NULL); } void diff --git a/lib/rewrite/rewrite-set-facility.c b/lib/rewrite/rewrite-set-facility.c index d9e11b1702..8fd6da285b 100644 --- a/lib/rewrite/rewrite-set-facility.c +++ b/lib/rewrite/rewrite-set-facility.c @@ -86,6 +86,7 @@ log_rewrite_set_facility_process(LogRewrite *s, LogMessage **pmsg, const LogPath GString *result = scratch_buffers_alloc_and_mark(&marker); LogRewriteSetFacility *self = (LogRewriteSetFacility *) s; + msg_set_context(*pmsg); log_msg_make_writable(pmsg, path_options); log_template_format(self->facility, *pmsg, &DEFAULT_TEMPLATE_EVAL_OPTIONS, result); @@ -101,9 +102,9 @@ log_rewrite_set_facility_process(LogRewrite *s, LogMessage **pmsg, const LogPath msg_trace("Setting syslog facility", evt_tag_int("old_facility", (*pmsg)->pri & SYSLOG_FACMASK), - evt_tag_int("new_facility", facility), - evt_tag_msg_reference(*pmsg)); + evt_tag_int("new_facility", facility)); _set_msg_facility(*pmsg, facility); + msg_set_context(NULL); error: scratch_buffers_reclaim_marked(marker); diff --git a/lib/rewrite/rewrite-set-pri.c b/lib/rewrite/rewrite-set-pri.c index 4ec9abb4ed..c3bd733e7f 100644 --- a/lib/rewrite/rewrite-set-pri.c +++ b/lib/rewrite/rewrite-set-pri.c @@ -60,6 +60,7 @@ log_rewrite_set_pri_process(LogRewrite *s, LogMessage **pmsg, const LogPathOptio LogRewriteSetPri *self = (LogRewriteSetPri *) s; GString *result = scratch_buffers_alloc(); + msg_set_context(*pmsg); log_msg_make_writable(pmsg, path_options); log_template_format(self->pri, *pmsg, &DEFAULT_TEMPLATE_EVAL_OPTIONS, result); @@ -75,9 +76,9 @@ log_rewrite_set_pri_process(LogRewrite *s, LogMessage **pmsg, const LogPathOptio msg_trace("Setting syslog pri", evt_tag_int("old_pri", (*pmsg)->pri), - evt_tag_int("new_pri", pri), - evt_tag_msg_reference(*pmsg)); + evt_tag_int("new_pri", pri)); (*pmsg)->pri = pri; + msg_set_context(NULL); } static LogPipe * diff --git a/lib/rewrite/rewrite-set-severity.c b/lib/rewrite/rewrite-set-severity.c index f50ab784cb..770138409f 100644 --- a/lib/rewrite/rewrite-set-severity.c +++ b/lib/rewrite/rewrite-set-severity.c @@ -87,6 +87,7 @@ log_rewrite_set_severity_process(LogRewrite *s, LogMessage **pmsg, const LogPath GString *result = scratch_buffers_alloc_and_mark(&marker); LogRewriteSetSeverity *self = (LogRewriteSetSeverity *) s; + msg_set_context(*pmsg); log_msg_make_writable(pmsg, path_options); log_template_format(self->severity, *pmsg, &DEFAULT_TEMPLATE_EVAL_OPTIONS, result); @@ -102,9 +103,9 @@ log_rewrite_set_severity_process(LogRewrite *s, LogMessage **pmsg, const LogPath msg_trace("Setting syslog severity", evt_tag_int("old_severity", SYSLOG_PRI((*pmsg)->pri)), - evt_tag_int("new_severity", severity), - evt_tag_msg_reference(*pmsg)); + evt_tag_int("new_severity", severity)); _set_msg_severity(*pmsg, severity); + msg_set_context(NULL); error: scratch_buffers_reclaim_marked(marker); diff --git a/modules/add-contextual-data/add-contextual-data.c b/modules/add-contextual-data/add-contextual-data.c index cef440074b..e3fafc00cb 100644 --- a/modules/add-contextual-data/add-contextual-data.c +++ b/modules/add-contextual-data/add-contextual-data.c @@ -114,6 +114,7 @@ _process(LogParser *s, LogMessage **pmsg, { AddContextualData *self = (AddContextualData *) s; LogMessage *msg = log_msg_make_writable(pmsg, path_options); + msg_set_context(msg); gchar *resolved_selector = add_contextual_data_selector_resolve(self->selector, msg); const gchar *selector = resolved_selector; @@ -123,8 +124,7 @@ _process(LogParser *s, LogMessage **pmsg, msg_trace("add-contextual-data(): message lookup finished", evt_tag_str("message", input), evt_tag_str("resolved_selector", resolved_selector), - evt_tag_str("selector", selector), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("selector", selector)); if (selector) context_info_db_foreach_record(self->context_info_db, selector, @@ -132,6 +132,7 @@ _process(LogParser *s, LogMessage **pmsg, (gpointer) msg); g_free(resolved_selector); + msg_set_context(NULL); return TRUE; } diff --git a/modules/afsnmp/snmptrapd-parser.c b/modules/afsnmp/snmptrapd-parser.c index 6c280df323..bab6080b3c 100644 --- a/modules/afsnmp/snmptrapd-parser.c +++ b/modules/afsnmp/snmptrapd-parser.c @@ -175,11 +175,11 @@ snmptrapd_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions * SnmpTrapdParser *self = (SnmpTrapdParser *) s; ScratchBuffersMarker marker; + msg_set_context(*pmsg); log_msg_make_writable(pmsg, path_options); msg_trace("snmptrapd-parser message processing started", evt_tag_str ("input", input), - evt_tag_str ("prefix", self->prefix->str), - evt_tag_msg_reference(*pmsg)); + evt_tag_str ("prefix", self->prefix->str)); APPEND_ZERO(input, input, input_len); @@ -225,6 +225,7 @@ snmptrapd_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions * log_msg_unset_value(nv_context.msg, LM_V_MESSAGE); } + msg_set_context(NULL); return TRUE; } diff --git a/modules/correlation/dbparser.c b/modules/correlation/dbparser.c index a55a24d778..e90fcfc268 100644 --- a/modules/correlation/dbparser.c +++ b/modules/correlation/dbparser.c @@ -200,10 +200,10 @@ log_db_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *pat } if (self->db) { + msg_set_context(*pmsg); log_msg_make_writable(pmsg, path_options); msg_trace("db-parser message processing started", - evt_tag_str("input", input), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("input", input)); if (G_UNLIKELY(self->super.super.template_obj)) matched = pattern_db_process_with_custom_message(self->db, *pmsg, input, input_len); else @@ -220,6 +220,8 @@ log_db_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *pat matched = TRUE; if (self->super.inject_mode == LDBP_IM_AGGREGATE_ONLY) matched = FALSE; + + msg_set_context(NULL); return matched; } diff --git a/modules/csvparser/csvparser.c b/modules/csvparser/csvparser.c index a5451859bb..86873369c7 100644 --- a/modules/csvparser/csvparser.c +++ b/modules/csvparser/csvparser.c @@ -280,10 +280,10 @@ csv_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_o CSVParser *self = (CSVParser *) s; LogMessage *msg = log_msg_make_writable(pmsg, path_options); + msg_set_context(msg); msg_trace("csv-parser message processing started", evt_tag_str ("input", input), - evt_tag_str ("prefix", self->prefix), - evt_tag_msg_reference(msg)); + evt_tag_str ("prefix", self->prefix)); CSVScanner scanner; csv_scanner_init(&scanner, &self->options, input); @@ -305,6 +305,7 @@ csv_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_o result = TRUE; csv_scanner_deinit(&scanner); + msg_set_context(NULL); return result; } diff --git a/modules/geoip2/geoip-parser.c b/modules/geoip2/geoip-parser.c index d7a2438377..0052faec08 100644 --- a/modules/geoip2/geoip-parser.c +++ b/modules/geoip2/geoip-parser.c @@ -93,10 +93,10 @@ maxminddb_parser_process(LogParser *s, LogMessage **pmsg, { GeoIPParser *self = (GeoIPParser *) s; LogMessage *msg = log_msg_make_writable(pmsg, path_options); + msg_set_context(msg); msg_trace("geoip2-parser message processing started", evt_tag_str("input", input), - evt_tag_str("prefix", self->prefix), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("prefix", self->prefix)); MMDB_entry_data_list_s *entry_data_list; if (!_mmdb_load_entry_data_list(self, input, &entry_data_list)) @@ -111,6 +111,7 @@ maxminddb_parser_process(LogParser *s, LogMessage **pmsg, MMDB_free_entry_data_list(entry_data_list); g_array_free(path, TRUE); + msg_set_context(NULL); return TRUE; } diff --git a/modules/grpc/otel/otel-dest-worker.cpp b/modules/grpc/otel/otel-dest-worker.cpp index d25d0c3a89..b472205508 100644 --- a/modules/grpc/otel/otel-dest-worker.cpp +++ b/modules/grpc/otel/otel-dest-worker.cpp @@ -356,11 +356,12 @@ DestWorker::insert(LogMessage *msg) return LTR_QUEUED; drop: + msg_set_context(msg); msg_error("OpenTelemetry: Failed to insert message, dropping message", - log_pipe_location_tag(&owner.super->super.super.super.super), - evt_tag_msg_reference(msg)); + log_pipe_location_tag(&owner.super->super.super.super.super)); /* LTR_DROP currently drops the entire batch */ + msg_set_context(NULL); return LTR_QUEUED; } diff --git a/modules/grpc/otel/otel-protobuf-parser.cpp b/modules/grpc/otel/otel-protobuf-parser.cpp index 3b8928dd79..4fe925a5b5 100644 --- a/modules/grpc/otel/otel-protobuf-parser.cpp +++ b/modules/grpc/otel/otel-protobuf-parser.cpp @@ -57,10 +57,11 @@ _get_string_field(LogMessage *msg, NVHandle handle, gssize *len) if (type != LM_VT_STRING) { + msg_set_context(msg); msg_error("OpenTelemetry: unexpected LogMessage type, while getting string field", - evt_tag_msg_reference(msg), evt_tag_str("name", log_msg_get_value_name(handle, NULL)), evt_tag_str("type", log_msg_value_type_to_str(type))); + msg_set_context(NULL); return nullptr; } @@ -75,10 +76,11 @@ _get_protobuf_field(LogMessage *msg, NVHandle handle, gssize *len) if (type != LM_VT_PROTOBUF) { + msg_set_context(msg); msg_error("OpenTelemetry: unexpected LogMessage type, while getting protobuf field", - evt_tag_msg_reference(msg), evt_tag_str("name", log_msg_get_value_name(handle, NULL)), - evt_tag_str("type", log_msg_value_type_to_str(type))); + evt_tag_str("type", log_msg_value_type_to_str(type)), NULL); + msg_set_context(NULL); return nullptr; } @@ -269,23 +271,30 @@ _extract_saddr(const grpc::string &peer) return NULL; } +static bool +_exit_on_fail() +{ + msg_set_context(NULL); + return false; +} + static bool _parse_metadata(LogMessage *msg, bool set_hostname) { char number_buf[G_ASCII_DTOSTR_BUF_SIZE]; gssize len; const gchar *value; + msg_set_context(msg); /* .otel.resource.<...> */ value = _get_protobuf_field(msg, logmsg_handle::RAW_RESOURCE, &len); if (!value) - return false; + return _exit_on_fail(); Resource resource; if (!resource.ParsePartialFromArray(value, len)) { - msg_error("OpenTelemetry: Failed to deserialize .otel_raw.resource", - evt_tag_msg_reference(msg)); - return false; + msg_error("OpenTelemetry: Failed to deserialize .otel_raw.resource"); + return _exit_on_fail(); } /* .otel.resource.attributes */ @@ -300,19 +309,18 @@ _parse_metadata(LogMessage *msg, bool set_hostname) /* .otel.resource.schema_url */ value = _get_string_field(msg, logmsg_handle::RAW_RESOURCE_SCHEMA_URL, &len); if (!value) - return false; + return _exit_on_fail(); log_msg_set_value_with_type(msg, logmsg_handle::RESOURCE_SCHEMA_URL, value, len, LM_VT_STRING); /* .otel.scope.<...> */ value = _get_protobuf_field(msg, logmsg_handle::RAW_SCOPE, &len); if (!value) - return false; + return _exit_on_fail(); InstrumentationScope scope; if (!scope.ParsePartialFromArray(value, len)) { - msg_error("OpenTelemetry: Failed to deserialize .otel_raw.scope", - evt_tag_msg_reference(msg)); - return false; + msg_error("OpenTelemetry: Failed to deserialize .otel_raw.scope"); + ; } /* .otel.scope.name */ @@ -331,9 +339,10 @@ _parse_metadata(LogMessage *msg, bool set_hostname) /* .otel.scope.schema_url */ value = _get_string_field(msg, logmsg_handle::RAW_SCOPE_SCHEMA_URL, &len); if (!value) - return false; + return _exit_on_fail(); log_msg_set_value_with_type(msg, logmsg_handle::SCOPE_SCHEMA_URL, value, len, LM_VT_STRING); + msg_set_context(NULL); return true; } @@ -383,6 +392,7 @@ static bool _parse_log_record(LogMessage *msg) { gssize len; + msg_set_context(msg); const gchar *raw_value = _get_protobuf_field(msg, logmsg_handle::RAW_LOG, &len); if (!raw_value) return false; @@ -390,8 +400,7 @@ _parse_log_record(LogMessage *msg) LogRecord log_record; if (!log_record.ParsePartialFromArray(raw_value, len)) { - msg_error("OpenTelemetry: Failed to deserialize .otel_raw.log", - evt_tag_msg_reference(msg)); + msg_error("OpenTelemetry: Failed to deserialize .otel_raw.log"); return false; } @@ -457,6 +466,7 @@ _parse_log_record(LogMessage *msg) /* .otel.log.span_id */ _set_value(msg, logmsg_handle::LOG_SPAN_ID, log_record.span_id(), LM_VT_BYTES); + msg_set_context(NULL); return true; } @@ -927,6 +937,7 @@ _add_metric_data_fields(LogMessage *msg, const Metric &metric) static bool _parse_metric(LogMessage *msg) { + msg_set_context(msg); gssize len; const gchar *raw_value = _get_protobuf_field(msg, logmsg_handle::RAW_METRIC, &len); if (!raw_value) @@ -935,8 +946,7 @@ _parse_metric(LogMessage *msg) Metric metric; if (!metric.ParsePartialFromArray(raw_value, len)) { - msg_error("OpenTelemetry: Failed to deserialize .otel_raw.metric", - evt_tag_msg_reference(msg)); + msg_error("OpenTelemetry: Failed to deserialize .otel_raw.metric"); return false; } @@ -954,6 +964,7 @@ _parse_metric(LogMessage *msg) _add_metric_data_fields(msg, metric); + msg_set_context(NULL); return true; } @@ -962,15 +973,15 @@ _parse_span(LogMessage *msg) { gssize len; const gchar *raw_value = _get_protobuf_field(msg, logmsg_handle::RAW_SPAN, &len); + msg_set_context(msg); if (!raw_value) - return false; + return _exit_on_fail(); Span span; if (!span.ParsePartialFromArray(raw_value, len)) { - msg_error("OpenTelemetry: Failed to deserialize .otel_raw.span", - evt_tag_msg_reference(msg)); - return false; + msg_error("OpenTelemetry: Failed to deserialize .otel_raw.span"); + return _exit_on_fail(); } /* .otel.type */ @@ -1098,6 +1109,7 @@ _parse_span(LogMessage *msg) std::snprintf(number_buf, G_N_ELEMENTS(number_buf), "%" PRIi32, status.code()); _set_value(msg, logmsg_handle::SPAN_STATUS_CODE, number_buf, LM_VT_INTEGER); + msg_set_context(NULL); return true; } @@ -1183,21 +1195,24 @@ _nanosec_to_unix_time(uint64_t nanosec, UnixTime *unix_time) static bool _value_case_equals_or_error(LogMessage *msg, const KeyValue &kv, const AnyValue::ValueCase &expected_value_case) { + bool success = true; + msg_set_context(msg); if (kv.value().value_case() != expected_value_case) { msg_error("OpenTelemetry: unexpected attribute value type, skipping", - evt_tag_msg_reference(msg), evt_tag_str("name", kv.key().c_str()), evt_tag_int("type", kv.value().value_case())); - return false; + success = false; } - return true; + msg_set_context(NULL); + return success; } void syslogng::grpc::otel::ProtobufParser::set_syslog_ng_nv_pairs(LogMessage *msg, const KeyValueList &types) { + msg_set_context(msg); for (const KeyValue &nv_pairs_by_type : types.values()) { LogMessageValueType log_msg_type; @@ -1205,14 +1220,12 @@ syslogng::grpc::otel::ProtobufParser::set_syslog_ng_nv_pairs(LogMessage *msg, co if (!log_msg_value_type_from_str(type_as_str.c_str(), &log_msg_type)) { msg_debug("OpenTelemetry: unexpected attribute logmsg type, skipping", - evt_tag_msg_reference(msg), evt_tag_str("type", type_as_str.c_str())); continue; } if (nv_pairs_by_type.value().value_case() != AnyValue::kKvlistValue) { msg_debug("OpenTelemetry: unexpected attribute, skipping", - evt_tag_msg_reference(msg), evt_tag_str("key", type_as_str.c_str())); continue; } @@ -1227,11 +1240,13 @@ syslogng::grpc::otel::ProtobufParser::set_syslog_ng_nv_pairs(LogMessage *msg, co log_msg_set_value_by_name_with_type(msg, name.c_str(), value.c_str(), value.length(), log_msg_type); } } + msg_set_context(NULL); } void syslogng::grpc::otel::ProtobufParser::set_syslog_ng_macros(LogMessage *msg, const KeyValueList ¯os) { + msg_set_context(msg); for (const KeyValue ¯o : macros.values()) { const std::string &name = macro.key(); @@ -1245,7 +1260,6 @@ syslogng::grpc::otel::ProtobufParser::set_syslog_ng_macros(LogMessage *msg, cons else { msg_error("OpenTelemetry: unexpected attribute value type, skipping", - evt_tag_msg_reference(msg), evt_tag_str("name", macro.key().c_str()), evt_tag_int("type", macro.value().value_case())); } @@ -1271,10 +1285,10 @@ syslogng::grpc::otel::ProtobufParser::set_syslog_ng_macros(LogMessage *msg, cons else { msg_debug("OpenTelemetry: unexpected attribute macro, skipping", - evt_tag_msg_reference(msg), evt_tag_str("name", name.c_str())); } } + msg_set_context(NULL); } void @@ -1343,6 +1357,7 @@ syslogng::grpc::otel::ProtobufParser::parse_syslog_ng_tags(LogMessage *msg, cons void syslogng::grpc::otel::ProtobufParser::store_syslog_ng(LogMessage *msg, const LogRecord &log_record) { + msg_set_context(msg); _nanosec_to_unix_time(log_record.time_unix_nano(), &msg->timestamps[LM_TS_STAMP]); _nanosec_to_unix_time(log_record.observed_time_unix_nano(), &msg->timestamps[LM_TS_RECVD]); @@ -1352,7 +1367,6 @@ syslogng::grpc::otel::ProtobufParser::store_syslog_ng(LogMessage *msg, const Log if (attr.value().value_case() != AnyValue::kKvlistValue) { msg_debug("OpenTelemetry: unexpected attribute, skipping", - evt_tag_msg_reference(msg), evt_tag_str("key", key.c_str())); continue; } @@ -1377,10 +1391,10 @@ syslogng::grpc::otel::ProtobufParser::store_syslog_ng(LogMessage *msg, const Log else { msg_debug("OpenTelemetry: unexpected attribute, skipping", - evt_tag_msg_reference(msg), evt_tag_str("key", key.c_str())); } } + msg_set_context(NULL); } bool @@ -1395,8 +1409,9 @@ syslogng::grpc::otel::ProtobufParser::is_syslog_ng_log_record(const Resource &re bool syslogng::grpc::otel::ProtobufParser::process(LogMessage *msg) { - msg_trace("OpenTelemetry: message processing started", - evt_tag_msg_reference(msg)); + msg_set_context(msg); + msg_trace("OpenTelemetry: message processing started"); + bool success = true; gssize len; LogMessageValueType log_msg_type; @@ -1407,46 +1422,60 @@ syslogng::grpc::otel::ProtobufParser::process(LogMessage *msg) if (log_msg_type == LM_VT_NULL) { /* Not an opentelemetry() message or it is a syslog-ng-otlp() message already parsed in the source */ - return true; + goto exit; } if (log_msg_type != LM_VT_STRING) { msg_error("OpenTelemetry: unexpected .otel_raw.type LogMessage type", - evt_tag_msg_reference(msg), evt_tag_str("log_msg_type", log_msg_value_type_to_str(log_msg_type))); - return false; + success = false; + goto exit; } if (!_parse_metadata(msg, this->set_host)) - return false; + { + success = false; + goto exit; + } if (type == "log") { if (!_parse_log_record(msg)) - return false; + { + success = false; + goto exit; + } } else if (type == "metric") { if (!_parse_metric(msg)) - return false; + { + success = false; + goto exit; + } } else if (type == "span") { if (!_parse_span(msg)) - return false; + { + success = false; + goto exit; + } } else { msg_error("OpenTelemetry: unexpected .otel_raw.type", - evt_tag_msg_reference(msg), evt_tag_str("type", type.c_str())); - return false; + success = false; + goto exit; } _unset_raw_fields(msg); - return true; +exit: + msg_set_context(NULL); + return success; } static gboolean diff --git a/modules/json/json-parser.c b/modules/json/json-parser.c index fa284be2e5..445fcc3961 100644 --- a/modules/json/json-parser.c +++ b/modules/json/json-parser.c @@ -332,12 +332,13 @@ json_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_ JSONParser *self = (JSONParser *) s; struct json_object *jso; struct json_tokener *tok; + gboolean success = TRUE; + msg_set_context(*pmsg); msg_trace("json-parser message processing started", evt_tag_str("input", input), evt_tag_str("prefix", self->prefix), - evt_tag_str("marker", self->marker), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("marker", self->marker)); if (self->marker) { if (strncmp(input, self->marker, self->marker_len) != 0) @@ -345,7 +346,8 @@ json_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_ msg_debug("json-parser(): no marker at the beginning of the message, skipping JSON parsing ", evt_tag_str("input", input), evt_tag_str("marker", self->marker)); - return FALSE; + success = FALSE; + goto exit; } input += self->marker_len; @@ -361,7 +363,8 @@ json_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_ evt_tag_str("input", input), tok->err != json_tokener_success ? evt_tag_str ("json_error", json_tokener_error_desc(tok->err)) : NULL); json_tokener_free (tok); - return FALSE; + success = FALSE; + goto exit; } json_tokener_free(tok); @@ -372,11 +375,14 @@ json_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_ evt_tag_str("input", input), evt_tag_str("extract_prefix", self->extract_prefix)); json_object_put(jso); - return FALSE; + success = FALSE; + goto exit; } json_object_put(jso); - return TRUE; +exit: + return success; + msg_set_context(NULL); } static LogPipe * diff --git a/modules/kvformat/kv-parser.c b/modules/kvformat/kv-parser.c index 5f213d6e3b..1cc9a8614f 100644 --- a/modules/kvformat/kv-parser.c +++ b/modules/kvformat/kv-parser.c @@ -110,10 +110,10 @@ _process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_options, co GString *formatted_key = scratch_buffers_alloc(); log_msg_make_writable(pmsg, path_options); + msg_set_context(*pmsg); msg_trace("kv-parser message processing started", evt_tag_str("input", input), - evt_tag_str("prefix", self->prefix), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("prefix", self->prefix)); /* FIXME: input length */ kv_scanner_input(&kv_scanner, input); while (kv_scanner_scan_next(&kv_scanner)) @@ -130,6 +130,7 @@ _process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_options, co kv_scanner_get_stray_words(&kv_scanner), -1); kv_scanner_deinit(&kv_scanner); + msg_set_context(NULL); return TRUE; } diff --git a/modules/map-value-pairs/map-value-pairs.c b/modules/map-value-pairs/map-value-pairs.c index 62a75ebf48..88a7b21cf3 100644 --- a/modules/map-value-pairs/map-value-pairs.c +++ b/modules/map-value-pairs/map-value-pairs.c @@ -41,14 +41,15 @@ _process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_options, MapValuePairs *self = (MapValuePairs *) s; GlobalConfig *cfg = log_pipe_get_config(&s->super); LogMessage *msg = log_msg_make_writable(pmsg, path_options); + msg_set_context(msg); msg_trace("value-pairs message processing started", - evt_tag_str("input", input), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("input", input)); LogTemplateEvalOptions options = {&cfg->template_options, LTZ_LOCAL, 0, NULL, LM_VT_STRING}; value_pairs_foreach(self->value_pairs, _map_name_values, msg, &options, msg); + msg_set_context(NULL); return TRUE; } diff --git a/modules/metrics-probe/metrics-probe.c b/modules/metrics-probe/metrics-probe.c index 9239de334f..d78e9785dc 100644 --- a/modules/metrics-probe/metrics-probe.c +++ b/modules/metrics-probe/metrics-probe.c @@ -94,9 +94,9 @@ _process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_options, co { MetricsProbe *self = (MetricsProbe *) s; + msg_set_context(*pmsg); msg_trace("metrics-probe message processing started", - evt_tag_str("key", self->metrics_template->key), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("key", self->metrics_template->key)); if (!dyn_metrics_template_is_enabled(self->metrics_template)) return TRUE; @@ -106,6 +106,7 @@ _process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_options, co gssize increment = _calculate_increment(self, *pmsg); stats_counter_add(counter, increment); + msg_set_context(NULL); return TRUE; } diff --git a/modules/python/python-logparser.c b/modules/python/python-logparser.c index 93fa010877..1f7832f383 100644 --- a/modules/python/python-logparser.c +++ b/modules/python/python-logparser.c @@ -199,12 +199,12 @@ python_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *pat gstate = PyGILState_Ensure(); { LogMessage *msg = log_msg_make_writable(pmsg, path_options); + msg_set_context(msg); msg_trace("python-parser message processing started", evt_tag_str("input", input), evt_tag_str("parser", self->super.name), - evt_tag_str("class", self->binding.class), - evt_tag_msg_reference(msg)); + evt_tag_str("class", self->binding.class)); PyObject *msg_object = py_log_message_new(msg, cfg); result = _py_invoke_parser_process(self, msg_object); @@ -212,6 +212,7 @@ python_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *pat } PyGILState_Release(gstate); + msg_set_context(NULL); return result; } diff --git a/modules/python/python-tf.c b/modules/python/python-tf.c index 9fd51e29a8..3445a02fa7 100644 --- a/modules/python/python-tf.c +++ b/modules/python/python-tf.c @@ -58,7 +58,9 @@ static PyObject * _py_invoke_template_function(PythonTfState *state, const gchar *function_name, LogMessage *msg, gint argc, GString *const *argv) { - PyObject *callable, *ret, *args; + PyObject *callable, *args; + PyObject *ret = NULL; + msg_set_context(msg); callable = _py_resolve_qualified_name(function_name); if (!callable) @@ -69,12 +71,11 @@ _py_invoke_template_function(PythonTfState *state, const gchar *function_name, L evt_tag_str("function", function_name), evt_tag_str("exception", _py_format_exception_text(buf, sizeof(buf)))); _py_finish_exception_handling(); - return NULL; + goto exit; } msg_debug("$(python): Invoking Python template function", - evt_tag_str("function", function_name), - evt_tag_msg_reference(msg)); + evt_tag_str("function", function_name)); args = _py_construct_args_tuple(state, msg, argc, argv); ret = PyObject_CallObject(callable, args); @@ -89,8 +90,10 @@ _py_invoke_template_function(PythonTfState *state, const gchar *function_name, L evt_tag_str("function", function_name), evt_tag_str("exception", _py_format_exception_text(buf, sizeof(buf)))); _py_finish_exception_handling(); - return NULL; + goto exit; } +exit: + msg_set_context(NULL); return ret; } diff --git a/modules/regexp-parser/regexp-parser.c b/modules/regexp-parser/regexp-parser.c index 4910d829ea..7142f20049 100644 --- a/modules/regexp-parser/regexp-parser.c +++ b/modules/regexp-parser/regexp-parser.c @@ -101,10 +101,10 @@ regexp_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *pat RegexpParser *self = (RegexpParser *) s; log_msg_make_writable(pmsg, path_options); + msg_set_context(*pmsg); msg_trace("regexp-parser message processing started", evt_tag_str("input", input), - evt_tag_str("prefix", self->prefix), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("prefix", self->prefix)); gboolean result = FALSE; for (GList *item = self->matchers; item; item = item->next) @@ -124,6 +124,7 @@ regexp_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *pat } } + msg_set_context(NULL); return result; } diff --git a/modules/syslogformat/sdata-parser.c b/modules/syslogformat/sdata-parser.c index 6522f1df59..22b8524f0c 100644 --- a/modules/syslogformat/sdata-parser.c +++ b/modules/syslogformat/sdata-parser.c @@ -32,12 +32,13 @@ sdata_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path LogMessage *msg; msg = log_msg_make_writable(pmsg, path_options); + msg_set_context(msg); msg_trace("sdata-parser() message processing started", - evt_tag_str("input", input), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("input", input)); const guchar *data = (const guchar *) input; gint data_len = input_len; + msg_set_context(NULL); return _syslog_format_parse_sd(msg, &data, &data_len, &self->parse_options); } diff --git a/modules/syslogformat/syslog-format.c b/modules/syslogformat/syslog-format.c index 102d82e1bd..ff8bd3cc79 100644 --- a/modules/syslogformat/syslog-format.c +++ b/modules/syslogformat/syslog-format.c @@ -952,12 +952,13 @@ _syslog_format_check_framing(LogMessage *msg, const guchar **data, gint *length) /* we did indeed find a series of digits that look like framing, that's * probably not what was intended. */ + msg_set_context(msg); msg_debug("RFC5425 style octet count was found at the start of the message, this is probably not what was intended", - evt_tag_mem("data", *data, src - (*data)), - evt_tag_msg_reference(msg)); + evt_tag_mem("data", *data, src - (*data))); log_msg_set_tag_by_id(msg, LM_T_SYSLOG_UNEXPECTED_FRAMING); *data = src; *length = left; + msg_set_context(NULL); } static void diff --git a/modules/syslogformat/syslog-parser.c b/modules/syslogformat/syslog-parser.c index f2071673cf..1e536b7632 100644 --- a/modules/syslogformat/syslog-parser.c +++ b/modules/syslogformat/syslog-parser.c @@ -39,18 +39,21 @@ syslog_parser_process(LogParser *s, LogMessage **pmsg, const LogPathOptions *pat LogMessage *msg; msg = log_msg_make_writable(pmsg, path_options); + msg_set_context(msg); msg_trace("syslog-parser message processing started", - evt_tag_str("input", input), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("input", input)); if (self->drop_invalid) { gsize problem_position = 0; - return msg_format_try_parse_into(&self->parse_options, msg, (guchar *) input, input_len, &problem_position); + gboolean res = msg_format_try_parse_into(&self->parse_options, msg, (guchar *) input, input_len, &problem_position); + msg_set_context(NULL); + return res; } else { msg_format_parse_into(&self->parse_options, msg, (guchar *) input, input_len); + msg_set_context(NULL); return TRUE; } } diff --git a/modules/systemd-journal/journal-reader.c b/modules/systemd-journal/journal-reader.c index 1dd4e296b4..c543d70d60 100644 --- a/modules/systemd-journal/journal-reader.c +++ b/modules/systemd-journal/journal-reader.c @@ -311,11 +311,12 @@ _handle_message(JournalReader *self) _set_program(self->options, msg); _set_transport(msg); + msg_set_context(msg); msg_debug("Incoming log entry from journal", - evt_tag_printf("input", "%s", log_msg_get_value(msg, LM_V_MESSAGE, NULL)), - evt_tag_msg_reference(msg)); + evt_tag_printf("input", "%s", log_msg_get_value(msg, LM_V_MESSAGE, NULL))); log_source_post(&self->super, msg); + msg_set_context(NULL); return log_source_free_to_send(&self->super); } diff --git a/modules/tagsparser/tags-parser.c b/modules/tagsparser/tags-parser.c index 8d09c8639f..14f21e51b2 100644 --- a/modules/tagsparser/tags-parser.c +++ b/modules/tagsparser/tags-parser.c @@ -38,9 +38,9 @@ _process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_options, co gsize input_len) { LogMessage *msg = log_msg_make_writable(pmsg, path_options); + msg_set_context(msg); msg_trace("tags-parser message processing started", - evt_tag_str("input", input), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("input", input)); ListScanner scanner; list_scanner_init(&scanner); @@ -51,6 +51,7 @@ _process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_options, co log_msg_set_tag_by_name(msg, list_scanner_get_current_value(&scanner)); } + msg_set_context(NULL); list_scanner_deinit(&scanner); return TRUE; } diff --git a/modules/timestamp/date-parser.c b/modules/timestamp/date-parser.c index 428b920d35..7db41c358d 100644 --- a/modules/timestamp/date-parser.c +++ b/modules/timestamp/date-parser.c @@ -170,9 +170,9 @@ date_parser_process(LogParser *s, LogMessage *msg = log_msg_make_writable(pmsg, path_options); UnixTime time_stamp; + msg_set_context(msg); msg_trace("date-parser message processing started", - evt_tag_str("input", input), - evt_tag_msg_reference(*pmsg)); + evt_tag_str("input", input)); /* this macro ensures zero termination by copying input to a * g_alloca()-d buffer if necessary. In most cases it's not though. @@ -187,6 +187,7 @@ date_parser_process(LogParser *s, _store_timestamp(self, msg, &time_stamp); + msg_set_context(NULL); return res; } diff --git a/modules/xml/windows-eventlog-xml-parser.c b/modules/xml/windows-eventlog-xml-parser.c index 208afb705e..ac912d6227 100644 --- a/modules/xml/windows-eventlog-xml-parser.c +++ b/modules/xml/windows-eventlog-xml-parser.c @@ -85,11 +85,11 @@ _process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_options, co { XMLParser *self = (XMLParser *) s; LogMessage *msg = log_msg_make_writable(pmsg, path_options); + msg_set_context(msg); XMLScanner xml_scanner; msg_trace("windows-eventlog-xml-parser message processing started", evt_tag_str ("input", input), - evt_tag_str ("prefix", self->prefix), - evt_tag_msg_reference(*pmsg)); + evt_tag_str ("prefix", self->prefix)); PushParams push_params = {.msg = msg, .create_lists = self->create_lists, .prefix = self->prefix}; xml_scanner_init(&xml_scanner, &self->options, &scanner_push_function, &push_params, self->prefix); @@ -100,6 +100,7 @@ _process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_options, co goto err; xml_scanner_deinit(&xml_scanner); + msg_set_context(NULL); return TRUE; err: @@ -108,6 +109,7 @@ _process(LogParser *s, LogMessage **pmsg, const LogPathOptions *path_options, co evt_tag_int("forward_invalid", self->forward_invalid)); g_error_free(error); xml_scanner_deinit(&xml_scanner); + msg_set_context(NULL); return self->forward_invalid; } diff --git a/modules/xml/xml.c b/modules/xml/xml.c index ddf3ab214e..d5230d81c0 100644 --- a/modules/xml/xml.c +++ b/modules/xml/xml.c @@ -104,10 +104,10 @@ xml_parser_process(LogParser *s, LogMessage **pmsg, XMLParser *self = (XMLParser *) s; LogMessage *msg = log_msg_make_writable(pmsg, path_options); XMLScanner xml_scanner; + msg_set_context(msg); msg_trace("xml-parser message processing started", evt_tag_str ("input", input), - evt_tag_str ("prefix", self->prefix), - evt_tag_msg_reference(*pmsg)); + evt_tag_str ("prefix", self->prefix)); PushParams push_params = {.msg = msg, .create_lists = self->create_lists}; xml_scanner_init(&xml_scanner, &self->options, &scanner_push_function, &push_params, self->prefix); @@ -118,6 +118,7 @@ xml_parser_process(LogParser *s, LogMessage **pmsg, goto err; xml_scanner_deinit(&xml_scanner); + msg_set_context(NULL); return TRUE; err: @@ -126,6 +127,7 @@ xml_parser_process(LogParser *s, LogMessage **pmsg, evt_tag_int("forward_invalid", self->forward_invalid)); g_error_free(error); xml_scanner_deinit(&xml_scanner); + msg_set_context(NULL); return self->forward_invalid; }