Skip to content

Comments

processor_cumulative_to_delta: add cumulative->delta processor for metrics#11472

Open
edsiper wants to merge 9 commits intomasterfrom
processor_cumulative_to_delta
Open

processor_cumulative_to_delta: add cumulative->delta processor for metrics#11472
edsiper wants to merge 9 commits intomasterfrom
processor_cumulative_to_delta

Conversation

@edsiper
Copy link
Member

@edsiper edsiper commented Feb 16, 2026

This PR:

  • Introduce new processor_cumulative_to_delta plugin.
  • Convert cumulative monotonic sums to delta values.
  • Convert cumulative histograms to delta values.
  • Add reset handling with drop_on_reset behavior.
  • Add first-sample behavior controls via initial_value (auto|keep|drop) and compatibility option drop_first.
  • Drop out-of-order samples per timeseries to avoid invalid deltas.
  • Keep non-monotonic sums unchanged (passthrough).
  • Add state tracking per timeseries and bounded cleanup/eviction logic.
  • Add internal and runtime tests for counters/histograms, reset behavior, first-sample modes, out-of-order samples, multi-series, and non-monotonic-sum passthrough.

Config options:

Option Type Default Description
initial_value string unset First-point behavior for new series: auto, keep, or drop. If unset, drop_first compatibility mode is used.
drop_first bool true Compatibility option, only used when initial_value=unset. When enabled, first sample is dropped.
drop_on_reset bool true If enabled, drop samples when a monotonic sum or histogram reset is detected. If disabled, emit post-reset value as delta baseline.

Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features

    • Adds a cumulative-to-delta processor to convert cumulative counters and histograms into deltas, with configurable initial-value, drop-first, and drop-on-reset behaviors and series/staleness controls.
    • Metrics library: adds start-timestamp support, exponential-histogram snapshots, and new summary sum/count setters.
  • Build/Configuration

    • New build-time option to enable the cumulative-to-delta processor plugin.
  • Tests

    • Adds unit and end-to-end tests covering resets, out-of-order, multi-series, histograms, and runtime pipeline scenarios.
  • Documentation

    • Updated metrics library README with examples and OTLP/start-timestamp guidance.

@coderabbitai
Copy link

coderabbitai bot commented Feb 16, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a new cumulative_to_delta processor plugin (build option, plugin registration, plugin entry, public header), implements a core per-series cumulative→delta conversion engine, updates cmetrics to support start timestamps and exp-hist snapshots/atomics, and adds unit and runtime tests plus build/test wiring.

Changes

Cohort / File(s) Summary
Build & Plugin Registration
cmake/plugins_options.cmake, plugins/CMakeLists.txt
Add FLB_PROCESSOR_CUMULATIVE_TO_DELTA build option and register processor_cumulative_to_delta plugin.
Plugin Sources & CMake
plugins/processor_cumulative_to_delta/CMakeLists.txt, plugins/processor_cumulative_to_delta/cumulative_to_delta.c, plugins/processor_cumulative_to_delta/cumulative_to_delta.h
New plugin target, implementation (config parsing, lifecycle, metrics processing), and public header exposing context API and initial-value modes.
Core Conversion Engine
plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c
Large new module: per-series state, hashing/keys, delta computation for counters/histograms/exp-histograms, TTL/GC, eviction, reset detection, and public context APIs.
Tests — Unit & Build Wiring
tests/internal/CMakeLists.txt, tests/internal/cumulative_to_delta.c
Conditional test wiring for metrics+plugin and comprehensive unit tests covering counters, histograms, resets, ordering, and initial-value modes.
Tests — Runtime
tests/runtime/processor_cumulative_to_delta.c
End-to-end runtime test suite exercising OTLP pipeline integration and processor behavior under multiple scenarios.
cmetrics Public API & Headers
lib/cmetrics/include/cmetrics/cmt_metric.h, lib/cmetrics/include/cmetrics/cmt_summary.h
Add start_timestamp fields, exp-hist snapshot type, new getters/setters, and summary sum/count setters (public API surface expanded).
cmetrics Core & Snapshots
lib/cmetrics/src/cmt_metric.c, lib/cmetrics/src/cmt_exp_histogram.c, lib/cmetrics/src/cmt_exp_histogram.*
Implement exp-hist snapshot API, start_timestamp handling, setters/getters, snapshot creation/destruction, and safe bucket replacement with locking.
cmetrics Decoding/Encoding Paths
lib/cmetrics/src/cmt_decode_*.c, lib/cmetrics/src/cmt_encode_*.c, lib/cmetrics/src/cmt_encode_msgpack.c, lib/cmetrics/src/cmt_encode_opentelemetry.c, lib/cmetrics/src/cmt_encode_prometheus*.c, lib/cmetrics/src/cmt_encode_text.c
Propagate start_timestamp and exp-hist snapshot usage across decoders/encoders; replace direct field access with atomic-safe setters/getters and snapshots.
cmetrics Atomic Infrastructure
lib/cmetrics/src/cmt_atomic_generic.c, lib/cmetrics/src/cmt_atomic_msvc.c
Move to one-time initialization bootstrap (pthread_once / InitOnceExecuteOnce), change error handling to return-based, and guard atomic ops initialization.
cmetrics Merging & Thread-safety
lib/cmetrics/src/cmt_cat.c, lib/cmetrics/src/cmt_summary.c
Convert several merging and summary updates to atomic operations and introduce locking/compare-exchange loops for safe concurrent updates.
Misc cmetrics & README
lib/cmetrics/CMakeLists.txt, lib/cmetrics/README.md, lib/cmetrics/tests/encoding.c
Patch version bump, README rewrite, and small test I/O/guard fixes.

Sequence Diagram(s)

sequenceDiagram
    participant Input as Fluent Bit Input
    participant Processor as cumulative_to_delta<br/>processor
    participant Core as flb_cumulative_to_delta_ctx
    participant CMetrics as cmetrics (cmt)
    participant Output as Fluent Bit Output

    Input->>Processor: deliver metrics context (cmt)
    activate Processor
    Processor->>CMetrics: inspect for cumulative metrics
    alt no cumulative metrics
        Processor->>Output: pass-through metrics
    else cumulative metrics present
        Processor->>Processor: clone metrics context
        Processor->>Core: flb_cumulative_to_delta_ctx_process(cloned cmt)
        activate Core
        Core->>CMetrics: lookup/build per-series keys
        Core->>Core: compute deltas (counter/hist/exp-hist) using stored state
        Core->>Core: apply drop_first / drop_on_reset / GC / eviction
        Core-->>Processor: return transformed metrics context
        deactivate Core
        Processor->>Output: emit transformed (delta) metrics
    end
    deactivate Processor
Loading

Estimated Code Review Effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly Related PRs

Suggested Reviewers

  • cosmo0920
  • niedbalski
  • patrick-stephens
  • celalettin1286

Poem

🐰 I hop through metrics, spot sums that grow tall,
I remember each series, each rise and each fall.
I nibble the first sample, skip resets with a grin,
Turn cumuls to deltas — a tidy little win.
Hoppy conversions — hoppity-hop, tally and spin!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 1.37% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely describes the main change: adding a cumulative-to-delta processor for metrics. It directly matches the primary objective of the PR.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch processor_cumulative_to_delta

Tip

Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 703844ae01

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🤖 Fix all issues with AI agents
In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c`:
- Around line 111-125: When bucket_count changes, freeing
state->last_hist_buckets and then attempting flb_calloc may fail and return
early while leaving state->last_hist_bucket_count set to the old non-zero value,
which will cause a NULL dereference later in process_histogram_sample; fix by
setting state->last_hist_bucket_count = 0 immediately after freeing
last_hist_buckets (before calling flb_calloc) so the state is consistent on
allocation failure, then only set last_hist_bucket_count = bucket_count after a
successful flb_calloc; reference symbols: state->last_hist_buckets,
state->last_hist_bucket_count, flb_calloc, and process_histogram_sample.
- Around line 249-264: The subtraction (now - state->last_update_timestamp) in
series_state_gc can wrap when the system clock moves backward; update the code
to avoid underflow by either (A) using a monotonic timestamp for 'now' wherever
cfl_time_now() is called (use CLOCK_MONOTONIC) so all last_update_timestamp and
now are comparable, or (B) add a guard inside series_state_gc: skip eviction
when now < state->last_update_timestamp (i.e., continue) or compute the age with
a conditional to prevent unsigned underflow before comparing to
context->series_ttl; apply the change around the symbols series_state_gc,
last_update_timestamp, series_ttl, series_state_table_del, and
series_state_destroy.
- Around line 617-638: The destroy function flb_cumulative_to_delta_ctx_destroy
currently destroys series state entries without removing their hash table
entries first; update the cfl_list_foreach_safe loop that iterates
context->series_list so that for each state (struct
flb_cumulative_to_delta_series) you first call series_state_table_del(context,
state) and then call series_state_destroy(state), mirroring the sequence used in
series_state_gc and series_state_evict_if_needed, before finally destroying
context->series_table and freeing context.

In `@plugins/processor_cumulative_to_delta/cumulative_to_delta.c`:
- Around line 139-203: cb_process_metrics currently returns a new out_cmt when
it converts cumulative metrics but leaves the original metrics_context alive,
relying on the caller to free it; fix by making ownership explicit: when you set
*out_context = out_cmt in cb_process_metrics, also free the original
metrics_context (cmt_destroy(metrics_context)) before returning, or
alternatively always return the original context and perform in-place
conversion; update cb_process_metrics, the symbols metrics_context, out_cmt, and
*out_context accordingly and add a short comment documenting the ownership
contract (referenced in flb_processor.c and input_metrics_append in
flb_input_metric.c) so callers know which side must free which cmt.

In `@plugins/processor_cumulative_to_delta/Untitled`:
- Line 1: The file named "Untitled" appears to be an orphan placeholder
containing the symbol flb_cumulative_to_delta_ctx_process; either remove this
file if it's unused, or rename it to a meaningful name (e.g., exports.txt or
processor_exports.txt) and update any build config to reference it (e.g., add
the new name to CMakeLists.txt or the plugin's export list). Ensure the file
name follows the naming convention used by other processor plugins and that any
symbol it provides (flb_cumulative_to_delta_ctx_process) is exported/declared
from the proper source module rather than left in a stray placeholder file.

In `@tests/internal/CMakeLists.txt`:
- Around line 244-254: Split the test guards so the generic metrics test (e.g.,
the entry that adds metrics.c) is guarded only by FLB_METRICS, and the
processor-specific cumulative_to_delta test (the block that sets
CUMULATIVE_TO_DELTA_PLUGIN_DIR/CUMULATIVE_TO_DELTA_CORE_SOURCE and calls
target_sources/target_include_directories on flb-it-cumulative_to_delta) is
guarded by both FLB_METRICS AND FLB_PROCESSOR_CUMULATIVE_TO_DELTA; update the
conditional around the cumulative_to_delta_core.c addition to require
(FLB_METRICS AND FLB_PROCESSOR_CUMULATIVE_TO_DELTA) while leaving metrics.c
under FLB_METRICS alone.

In `@tests/runtime/processor_cumulative_to_delta.c`:
- Around line 117-134: observation_add_capture currently returns index even when
observed.capture_count >= MAX_CAPTURE_METRICS causing callers
(observation_get_value_count / observation_get_value) to read out-of-bounds;
change observation_add_capture to return a sentinel (e.g., -1) when the array is
full and do not increment or write into observed.captures, keeping the
pthread_mutex_lock/unlock semantics intact, and update callers to check for a
negative return before indexing into observed.captures (references:
observation_add_capture, observation_get_value_count, observation_get_value).
🧹 Nitpick comments (6)
tests/runtime/processor_cumulative_to_delta.c (3)

147-167: No bounds checks on capture_index and value_index in accessor functions.

observation_get_value_count and observation_get_value index directly into observed.captures[] and .values[] without validating bounds. Coupled with the observation_add_capture issue above, this could lead to out-of-bounds reads. Consider adding guard checks (even in test code) to fail fast rather than silently corrupt.


56-108: strstr doesn't respect line boundaries.

strstr(line_start, line_prefix) can match a prefix that starts within the current line but extends past line_end. For short metric names in tests this is unlikely to cause false matches, but be aware this is not a strict per-line search.


340-470: Repetitive teardown pattern — consider a goto cleanup chain.

Every error branch in rt_ctx_create duplicates flb_destroy(context->flb); flb_free(context); return NULL;. A labeled cleanup pattern (e.g., goto cleanup) would reduce duplication and lower the risk of future cleanup omissions.

plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c (3)

266-282: cfl_list_size traversal on every loop iteration makes eviction O(n²).

cfl_list_size walks the entire list on each iteration of the while loop. With max_series = 65536, this could become noticeable. A simple count maintained alongside list mutations (or queried once before the loop and decremented) would reduce this to O(n).

Proposed fix
 static void series_state_evict_if_needed(struct flb_cumulative_to_delta_ctx *context)
 {
+    size_t current_size;
     struct flb_cumulative_to_delta_series *state;
 
-    while (cfl_list_size(&context->series_list) > context->max_series) {
+    current_size = cfl_list_size(&context->series_list);
+
+    while (current_size > context->max_series) {
         if (cfl_list_is_empty(&context->series_list)) {
             break;
         }
 
         state = cfl_list_entry_first(&context->series_list,
                                      struct flb_cumulative_to_delta_series,
                                      _head);
 
         series_state_table_del(context, state);
         series_state_destroy(state);
+        current_size--;
     }
 }

284-350: Double hash lookup on "new series" path and redundant state update on reset path.

Two observations in process_counter_sample:

  1. Lines 297-302: series_state_get(…, FLB_FALSE) followed by series_state_get(…, FLB_TRUE) performs two full key-build + hash-lookup cycles for every new series. A single call with create_if_missing = FLB_TRUE plus a was_created out-parameter (or checking state fields post-return) would halve the work.

  2. Lines 326-334 update state on reset, then line 345 updates state again with the same timestamp/value. The second update is redundant for the reset path.

Neither is a correctness issue, just wasted work on the hot path.


352-491: Same double-lookup pattern in process_histogram_sample.

Lines 377-382 duplicate the two-step series_state_get(FLB_FALSE) then series_state_get(FLB_TRUE) pattern noted above for counters.

…trics

  - Introduce new processor_cumulative_to_delta plugin.
  - Convert cumulative monotonic sums to delta values.
  - Convert cumulative histograms to delta values.
  - Add reset handling with drop_on_reset behavior.
  - Add first-sample behavior controls via initial_value (auto|keep|drop)
    and compatibility option drop_first.
  - Drop out-of-order samples per timeseries to avoid invalid deltas.
  - Keep non-monotonic sums unchanged (passthrough).
  - Add state tracking per timeseries and bounded cleanup/eviction logic.
  - Add internal and runtime tests for counters/histograms, reset behavior,
    first-sample modes, out-of-order samples, multi-series, and
    non-monotonic-sum passthrough.

Config options:

  - initial_value (string, default: unset)
    First-point behavior for new series: auto, keep, or drop.
    If unset, drop_first compatibility mode is used.

  - drop_first (bool, default: true)
    Compatibility option used only when initial_value=unset.
    If enabled, first sample is dropped.

Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
@edsiper
Copy link
Member Author

edsiper commented Feb 17, 2026

@codex review

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
Verify each finding against the current code and only fix it if needed.


In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c`:
- Around line 635-656: The code calls series_state_update_counter(state,
timestamp, current_value) twice when reset_detected == FLB_TRUE and
drop_on_reset == FLB_FALSE; remove the redundant update by ensuring
series_state_update_counter is only invoked once per code path: if
reset_detected and context->drop_on_reset is true return FLB_C2D_DROP (no
update), otherwise compute delta (set delta = current_value for reset case or
current_value - state->last_counter_value for normal case), call
cmt_metric_set(sample, timestamp, delta) and then call
series_state_update_counter(state, timestamp, current_value) exactly once;
update the logic around the reset_detected block to reflect this flow and
reference series_state_update_counter, reset_detected, context->drop_on_reset,
delta, and cmt_metric_set when making the change.

In `@tests/runtime/processor_cumulative_to_delta.c`:
- Around line 407-424: The processor allocated with flb_processor_create
(context->processor) is not freed on error paths (e.g., when
flb_processor_unit_create fails) causing a leak; update the error handling so
before calling flb_destroy(context->flb) and flb_free(context) you call the
processor cleanup routine (e.g., flb_processor_destroy(context->processor)) and
set context->processor = NULL, and apply the same cleanup in any later failure
branches prior to flb_input_set_processor to ensure the processor is always
destroyed on error.
🧹 Nitpick comments (1)
🤖 Fix all nitpicks with AI agents
Verify each finding against the current code and only fix it if needed.


In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c`:
- Around line 635-656: The code calls series_state_update_counter(state,
timestamp, current_value) twice when reset_detected == FLB_TRUE and
drop_on_reset == FLB_FALSE; remove the redundant update by ensuring
series_state_update_counter is only invoked once per code path: if
reset_detected and context->drop_on_reset is true return FLB_C2D_DROP (no
update), otherwise compute delta (set delta = current_value for reset case or
current_value - state->last_counter_value for normal case), call
cmt_metric_set(sample, timestamp, delta) and then call
series_state_update_counter(state, timestamp, current_value) exactly once;
update the logic around the reset_detected block to reflect this flow and
reference series_state_update_counter, reset_detected, context->drop_on_reset,
delta, and cmt_metric_set when making the change.
plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c (1)

635-656: Redundant state update when reset detected and drop_on_reset is false.

When reset_detected == FLB_TRUE and drop_on_reset == FLB_FALSE, series_state_update_counter is called at line 636 (inside the reset block), then called again at line 654 with the same (timestamp, current_value) arguments. The second call is unnecessary.

♻️ Suggested simplification
     if (reset_detected == FLB_TRUE) {
-        if (series_state_update_counter(state, timestamp, current_value) != 0) {
-            return -1;
-        }
-
         if (context->drop_on_reset == FLB_TRUE) {
+            if (series_state_update_counter(state, timestamp, current_value) != 0) {
+                return -1;
+            }
             return FLB_C2D_DROP;
         }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c` around
lines 635 - 656, The code calls series_state_update_counter(state, timestamp,
current_value) twice when reset_detected == FLB_TRUE and drop_on_reset ==
FLB_FALSE; remove the redundant update by ensuring series_state_update_counter
is only invoked once per code path: if reset_detected and context->drop_on_reset
is true return FLB_C2D_DROP (no update), otherwise compute delta (set delta =
current_value for reset case or current_value - state->last_counter_value for
normal case), call cmt_metric_set(sample, timestamp, delta) and then call
series_state_update_counter(state, timestamp, current_value) exactly once;
update the logic around the reset_detected block to reflect this flow and
reference series_state_update_counter, reset_detected, context->drop_on_reset,
delta, and cmt_metric_set when making the change.

Comment on lines +407 to +424
context->processor = flb_processor_create(context->flb->config,
"unit_test",
NULL,
0);
if (context->processor == NULL) {
flb_destroy(context->flb);
flb_free(context);
return NULL;
}

unit = flb_processor_unit_create(context->processor,
FLB_PROCESSOR_METRICS,
"cumulative_to_delta");
if (unit == NULL) {
flb_destroy(context->flb);
flb_free(context);
return NULL;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Minor leak: processor is not freed on early failure before flb_input_set_processor.

If flb_processor_unit_create fails (line 420) or any subsequent step fails before line 448, context->processor was allocated via flb_processor_create but is not linked to any input. flb_destroy(context->flb) won't know about it, so it leaks.

This is test-only code and only on error paths, so it's low impact.

🩹 Proposed fix: add processor cleanup on error
     unit = flb_processor_unit_create(context->processor,
                                      FLB_PROCESSOR_METRICS,
                                      "cumulative_to_delta");
     if (unit == NULL) {
+        flb_processor_destroy(context->processor);
         flb_destroy(context->flb);
         flb_free(context);
         return NULL;
     }

Apply similar cleanup in subsequent error paths before flb_input_set_processor.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/runtime/processor_cumulative_to_delta.c` around lines 407 - 424, The
processor allocated with flb_processor_create (context->processor) is not freed
on error paths (e.g., when flb_processor_unit_create fails) causing a leak;
update the error handling so before calling flb_destroy(context->flb) and
flb_free(context) you call the processor cleanup routine (e.g.,
flb_processor_destroy(context->processor)) and set context->processor = NULL,
and apply the same cleanup in any later failure branches prior to
flb_input_set_processor to ensure the processor is always destroyed on error.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: a04039df74

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines 118 to +121
set(UNIT_TESTS_FILES
${UNIT_TESTS_FILES}
metrics.c
cumulative_to_delta.c

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Gate cumulative_to_delta unit test on processor option

cumulative_to_delta.c is always added to UNIT_TESTS_FILES when FLB_METRICS is on, but its implementation is only attached later under if(FLB_METRICS AND FLB_PROCESSOR_CUMULATIVE_TO_DELTA). In builds that disable this processor (-DFLB_PROCESSOR_CUMULATIVE_TO_DELTA=OFF) while keeping metrics/internal tests enabled, flb-it-cumulative_to_delta is still generated and will fail at link time with unresolved flb_cumulative_to_delta_ctx_* symbols.

Useful? React with 👍 / 👎.


cfl_hash_64bits_update(state, &count, sizeof(count));

cfl_list_foreach(head, &kvlist->list) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Make context hash independent from metadata key order

compute_context_identity() depends on hash_kvlist(), but hash_kvlist() hashes entries in insertion order by iterating kvlist->list directly. For resource/scope metadata that are semantically map-like, the same attributes in a different order will produce a different identity, so unchanged timeseries can be treated as new state and incorrectly re-trigger first-sample/reset behavior instead of producing continuous deltas.

Useful? React with 👍 / 👎.

Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c (1)

566-582: cfl_list_size in a while loop makes eviction O(n²).

cfl_list_size likely traverses the entire linked list on each iteration. If many entries need eviction (e.g., after a burst that exceeds max_series), this becomes quadratic. Consider maintaining a running count of series entries in the context struct to make the size check O(1).

♻️ Sketch: track series count in context

Add a size_t series_count field to flb_cumulative_to_delta_ctx, increment on insert in series_state_get, decrement on delete in series_state_table_del/series_state_destroy, and use it here:

-    while (cfl_list_size(&context->series_list) > context->max_series) {
-        if (cfl_list_is_empty(&context->series_list)) {
-            break;
-        }
+    while (context->series_count > context->max_series) {
+        if (cfl_list_is_empty(&context->series_list)) {
+            break;
+        }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c` around
lines 566 - 582, The eviction loop uses cfl_list_size(&context->series_list)
making repeated traversals O(n²); add a size_t series_count field to struct
flb_cumulative_to_delta_ctx, increment it when a series is inserted in
series_state_get, decrement it when a series is removed in
series_state_table_del/series_state_destroy, and replace cfl_list_size checks in
series_state_evict_if_needed with a simple comparison against
context->series_count to make eviction O(1) per iteration; ensure the decrement
happens exactly once per removed series and keep the existing cfl_list_*
operations unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c`:
- Line 1244: The code sets counter->aggregation_type =
CMT_AGGREGATION_TYPE_DELTA before iterating samples, which can leave the metric
in an inconsistent state if process_counter_sample fails; change the flow so you
first iterate and call process_counter_sample for all samples, aborting and
returning -1 on any failure without mutating counter->aggregation_type, and only
after all samples succeed set counter->aggregation_type =
CMT_AGGREGATION_TYPE_DELTA; apply the same deferred-commit pattern to histogram
code paths (defer setting histogram->aggregation_type to
CMT_AGGREGATION_TYPE_DELTA until process_histogram_sample-like calls succeed) or
alternatively document/convert failures to unrecoverable if you prefer not to
leave partially-processed metrics.

---

Duplicate comments:
In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c`:
- Around line 1419-1441: The destructor must remove hash table entries before
freeing each series to avoid dangling table references; in
flb_cumulative_to_delta_ctx_destroy iterate the context->series_list calling
series_state_table_del(context, state) and series_state_destroy(state) (as
already present), then call flb_hash_table_destroy(context->series_table) and
set context->series_table = NULL before flb_free(context) to ensure no
use-after-free of the table pointer; keep these calls in
flb_cumulative_to_delta_ctx_destroy and ensure series_state_table_del and
series_state_destroy are used exactly as shown.

---

Nitpick comments:
In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c`:
- Around line 566-582: The eviction loop uses
cfl_list_size(&context->series_list) making repeated traversals O(n²); add a
size_t series_count field to struct flb_cumulative_to_delta_ctx, increment it
when a series is inserted in series_state_get, decrement it when a series is
removed in series_state_table_del/series_state_destroy, and replace
cfl_list_size checks in series_state_evict_if_needed with a simple comparison
against context->series_count to make eviction O(1) per iteration; ensure the
decrement happens exactly once per removed series and keep the existing
cfl_list_* operations unchanged.

Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (4)
plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c (4)

584-659: Redundant state update in the reset + keep path.

When reset_detected == FLB_TRUE and drop_on_reset == FLB_FALSE, the state is updated at line 636 and again unconditionally at line 654 with the same values. You could remove the update inside the reset block and let the unconditional update at line 654 handle both cases. The early return at line 641 (drop_on_reset == TRUE) would need its own update, which it already has at line 636.

This is minor — no correctness impact.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c` around
lines 584 - 659, In process_counter_sample, when reset_detected is true you
currently call series_state_update_counter(state, timestamp, current_value)
twice; remove the redundant call inside the reset branch and rely on the
unconditional series_state_update_counter at the end to perform the update for
the "keep" path, but keep the existing update in the reset branch when
context->drop_on_reset == FLB_TRUE (so the early-return/drop path still updates
state before returning); adjust the reset branch to only set delta and handle
the drop-return without duplicating the update.

861-1218: Consider extracting the reset-detection and delta-computation phases into helper functions.

process_exp_histogram_sample spans ~350 lines with deeply nested logic for reset detection across four separate bucket-iteration loops, followed by delta computation and state update. Breaking this into smaller functions (e.g., exp_hist_detect_reset, exp_hist_compute_delta) would improve readability and testability.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c` around
lines 861 - 1218, The function process_exp_histogram_sample is too long—extract
reset-detection and delta-computation into helpers: implement
exp_hist_detect_reset(state, sample) that encapsulates all reset logic (checks
on scale/threshold/sum_set, count/zero_count comparison and the four
bucket-iteration loops using exp_hist_bucket_index_from_offset and
exp_hist_bucket_get_value) and returns a boolean; and implement
exp_hist_compute_delta(state, sample, reset_detected, &count_delta,
&zero_count_delta, &sum_delta, &cumulative_positive_snapshot,
&cumulative_negative_snapshot) to perform the snapshot allocation/copy,
per-bucket subtraction, and compute count/zero/sum deltas (mirroring the current
behavior and ensuring snapshots are freed on error). Replace the inlined blocks
in process_exp_histogram_sample with calls to these helpers, preserving calls to
series_state_update_exp_histogram and the error/freeing semantics.

566-582: cfl_list_size() called on every iteration of the eviction loop.

cfl_list_size() typically traverses the entire list (O(n)), making this while-loop O(n × k) where k is the number of evictions. Consider caching the count or computing it once before the loop and decrementing.

Suggested fix
 static void series_state_evict_if_needed(struct flb_cumulative_to_delta_ctx *context)
 {
+    int count;
     struct flb_cumulative_to_delta_series *state;
 
-    while (cfl_list_size(&context->series_list) > context->max_series) {
+    count = cfl_list_size(&context->series_list);
+
+    while (count > (int) context->max_series) {
         if (cfl_list_is_empty(&context->series_list)) {
             break;
         }
 
         state = cfl_list_entry_first(&context->series_list,
                                      struct flb_cumulative_to_delta_series,
                                      _head);
 
         series_state_table_del(context, state);
         series_state_destroy(state);
+        count--;
     }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c` around
lines 566 - 582, In series_state_evict_if_needed, avoid calling cfl_list_size()
each iteration — read the current size once into a local variable (e.g., int
count = cfl_list_size(&context->series_list)) and use while (count >
context->max_series) { ... decrement count after removing an entry } instead of
repeatedly calling cfl_list_size(); keep the existing checks that use
cfl_list_is_empty(&context->series_list) and continue to call
series_state_table_del(context, state) and series_state_destroy(state) for each
evicted state from context->series_list.

1443-1494: Partial processing on mid-batch failure.

If process_counter_map succeeds for some counters but process_histogram_map fails, the function returns -1 but the counters in metrics_context have already been mutated to delta values. The caller must treat the entire metrics context as tainted on error. This is an inherent trade-off of in-place mutation; consider documenting that on failure the metrics_context must be discarded.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c` around
lines 1443 - 1494, The function flb_cumulative_to_delta_ctx_process can
partially mutate metrics_context (via
process_counter_map/process_histogram_map/process_exp_histogram_map) and return
-1 on mid-batch failure, leaving the metrics_context in a tainted state; update
the function contract by adding a clear comment above
flb_cumulative_to_delta_ctx_process stating that any non-zero return means the
metrics_context has been partially mutated and MUST be discarded by the caller,
and additionally set a visible taint flag (e.g., add/set
context->last_process_tainted or a similar field on struct
flb_cumulative_to_delta_ctx) right before returning -1 so callers can
programmatically detect the tainted state after failures.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c`:
- Around line 1419-1441: Ensure flb_cumulative_to_delta_ctx_destroy follows the
same teardown ordering used by series_state_gc/series_state_evict_if_needed:
iterate the context->series_list and for each entry call
series_state_table_del(context, state) before series_state_destroy(state), then
destroy context->series_table with flb_hash_table_destroy if not NULL and
finally flb_free(context); verify the function
flb_cumulative_to_delta_ctx_destroy contains these exact calls and null checks
for context and context->series_table to maintain consistency.
- Around line 1220-1275: Move the mutation of the metric's aggregation_type so
it only occurs after all samples are processed successfully: in
process_counter_map ensure counter->aggregation_type =
CMT_AGGREGATION_TYPE_DELTA is set only after both the metric_static_set block
and the cfl_list_foreach_safe loop complete without returning -1 (error) or
FLB_C2D_DROP leaving partial state; apply the same pattern to the histogram and
exponential histogram handlers (e.g., process_histogram_map and
process_exponential_histogram_map) so their aggregation_type is only updated
after all samples are processed without failures.
- Around line 541-564: The clock-skew guard in series_state_gc is correct: keep
the if (now >= state->last_update_timestamp) check to skip entries when system
time moved backward; to clarify intent, add a short comment above that check in
series_state_gc referencing now and state->last_update_timestamp that explains
we intentionally skip eviction on backward clock adjustments (cfl_time_now uses
CLOCK_REALTIME) rather than changing eviction logic or using monotonic time; no
logic changes to series_state_table_del or series_state_destroy are needed.

---

Nitpick comments:
In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c`:
- Around line 584-659: In process_counter_sample, when reset_detected is true
you currently call series_state_update_counter(state, timestamp, current_value)
twice; remove the redundant call inside the reset branch and rely on the
unconditional series_state_update_counter at the end to perform the update for
the "keep" path, but keep the existing update in the reset branch when
context->drop_on_reset == FLB_TRUE (so the early-return/drop path still updates
state before returning); adjust the reset branch to only set delta and handle
the drop-return without duplicating the update.
- Around line 861-1218: The function process_exp_histogram_sample is too
long—extract reset-detection and delta-computation into helpers: implement
exp_hist_detect_reset(state, sample) that encapsulates all reset logic (checks
on scale/threshold/sum_set, count/zero_count comparison and the four
bucket-iteration loops using exp_hist_bucket_index_from_offset and
exp_hist_bucket_get_value) and returns a boolean; and implement
exp_hist_compute_delta(state, sample, reset_detected, &count_delta,
&zero_count_delta, &sum_delta, &cumulative_positive_snapshot,
&cumulative_negative_snapshot) to perform the snapshot allocation/copy,
per-bucket subtraction, and compute count/zero/sum deltas (mirroring the current
behavior and ensuring snapshots are freed on error). Replace the inlined blocks
in process_exp_histogram_sample with calls to these helpers, preserving calls to
series_state_update_exp_histogram and the error/freeing semantics.
- Around line 566-582: In series_state_evict_if_needed, avoid calling
cfl_list_size() each iteration — read the current size once into a local
variable (e.g., int count = cfl_list_size(&context->series_list)) and use while
(count > context->max_series) { ... decrement count after removing an entry }
instead of repeatedly calling cfl_list_size(); keep the existing checks that use
cfl_list_is_empty(&context->series_list) and continue to call
series_state_table_del(context, state) and series_state_destroy(state) for each
evicted state from context->series_list.
- Around line 1443-1494: The function flb_cumulative_to_delta_ctx_process can
partially mutate metrics_context (via
process_counter_map/process_histogram_map/process_exp_histogram_map) and return
-1 on mid-batch failure, leaving the metrics_context in a tainted state; update
the function contract by adding a clear comment above
flb_cumulative_to_delta_ctx_process stating that any non-zero return means the
metrics_context has been partially mutated and MUST be discarded by the caller,
and additionally set a visible taint flag (e.g., add/set
context->last_process_tainted or a similar field on struct
flb_cumulative_to_delta_ctx) right before returning -1 so callers can
programmatically detect the tainted state after failures.

Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Signed-off-by: Eduardo Silva <eduardo@chronosphere.io>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (6)
lib/cmetrics/src/cmt_atomic_msvc.c (1)

64-68: Redundant guard conditions in each atomic operation.

After a successful cmt_atomic_initialize() (returns 0), both atomic_operation_system_initialized and atomic_operation_system_status are guaranteed to be in their "success" states (1 and 0 respectively). The extra checks are harmless but never trigger independently. The generic backend at cmt_atomic_generic.c Line 61-62 omits the status check — consider aligning the two for consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@lib/cmetrics/src/cmt_atomic_msvc.c` around lines 64 - 68, The guard in
cmt_atomic_msvc.c redundantly checks atomic_operation_system_initialized and
atomic_operation_system_status after cmt_atomic_initialize(); remove the
unnecessary status check and align with the generic backend by changing the
condition to only test cmt_atomic_initialize() and
atomic_operation_system_initialized (i.e., if (cmt_atomic_initialize() != 0 ||
atomic_operation_system_initialized == 0) return 0;), keeping the same behavior
but eliminating the never-triggered atomic_operation_system_status check.
plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c (3)

1474-1525: Consider using cfl_list_foreach_safe for the outer metric-type loops.

Lines 1493, 1502, and 1511 use cfl_list_foreach to iterate metrics_context->counters, histograms, and exp_histograms. While the inner process_*_map functions only remove samples from the map's metrics list (not the counter/histogram entries from the cmt-level lists), using _safe would be defensive against future changes that might remove entire metric entries during processing.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c` around
lines 1474 - 1525, In flb_cumulative_to_delta_ctx_process, replace the three
uses of cfl_list_foreach over metrics_context->counters, ->histograms, and
->exp_histograms with cfl_list_foreach_safe to guard against removals during
iteration; add a second iterator variable (e.g., struct cfl_list *tmp) alongside
head and use cfl_list_foreach_safe(head, tmp, &metrics_context->counters) (and
similarly for histograms and exp_histograms) so process_counter_map,
process_histogram_map, and process_exp_histogram_map can safely remove list
entries in future without corrupting the loop.

570-586: cfl_list_size in the eviction loop is O(n) per iteration, making mass eviction O(n²).

If max_series is dynamically reduced (via flb_cumulative_to_delta_ctx_configure) while many series are tracked, this loop becomes quadratic because cfl_list_size walks the entire list on every iteration. Consider maintaining an active_series_count field on the context to make the size check O(1).

Also, the cfl_list_is_empty check on Line 575 is unreachable since the while condition already guarantees list_size > max_series > 0.

Sketch: maintain a size counter
 struct flb_cumulative_to_delta_ctx {
     ...
+    size_t series_count;
     struct flb_hash_table *series_table;
     struct cfl_list series_list;
 };

Then increment/decrement series_count in series_state_get (on add), series_state_destroy/series_state_gc/series_state_evict_if_needed (on remove), and use it in the while-loop condition.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c` around
lines 570 - 586, The eviction loop in series_state_evict_if_needed is O(n²)
because cfl_list_size is called each iteration and the cfl_list_is_empty check
is redundant; add an active_series_count (or series_count) field to struct
flb_cumulative_to_delta_ctx and use it for O(1) checks in
series_state_evict_if_needed (replace cfl_list_size with active_series_count and
remove the unreachable cfl_list_is_empty branch), and update this counter
whenever series are added/removed — increment in series_state_get (or the
function that creates/adds a series) and decrement in series_state_destroy,
series_state_table_del/series_state_gc and inside series_state_evict_if_needed
when you remove entries — also ensure flb_cumulative_to_delta_ctx_configure
adjusts/uses the new counter appropriately.

588-672: Redundant state update on reset-without-drop path.

When reset_detected == FLB_TRUE and drop_on_reset == FLB_FALSE, series_state_update_counter is called twice with the same arguments — once at Line 640 and again at Line 667. The second call overwrites identical values. Consider restructuring so the state update only happens once (at Line 667), and the early return at Line 645 uses the first call.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c` around
lines 588 - 672, In process_counter_sample, avoid calling
series_state_update_counter twice when reset_detected is true: remove the early
unconditional call to series_state_update_counter inside the reset handling
block and instead call it only once at the end of the function; to preserve
behavior for the drop-on-reset path, add a conditional that when reset_detected
is true and context->drop_on_reset is true you call
series_state_update_counter(state, timestamp, current_value) once and
immediately return FLB_C2D_DROP, otherwise do not call it there and let the
single final series_state_update_counter(state, timestamp, current_value)
execute before returning FLB_C2D_KEEP. This touches process_counter_sample, the
reset_detected boolean and context->drop_on_reset, and the
series_state_update_counter calls.
lib/cmetrics/src/cmt_atomic_generic.c (1)

86-104: Pre-existing: cmt_atomic_store proceeds with the write even when pthread_mutex_lock fails.

Line 97-99 check the lock result but still fall through to writing *storage at Line 101 without the lock held. This is a pre-existing issue acknowledged by the TODO at Lines 28-32, not introduced by this PR.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@lib/cmetrics/src/cmt_atomic_generic.c` around lines 86 - 104, In
cmt_atomic_store, currently the function continues to write *storage = new_value
even if pthread_mutex_lock(&atomic_operation_lock) fails; update
cmt_atomic_store so that after calling pthread_mutex_lock you check the returned
result and do not proceed with the write unless the lock was successfully
acquired (result == 0) — on failure, call an appropriate failure path (e.g.,
return immediately or log via existing error mechanism) and avoid calling
pthread_mutex_unlock; keep the existing early initialization checks
(cmt_atomic_initialize and atomic_operation_system_initialized) and reference
the same symbols: cmt_atomic_store, pthread_mutex_lock(&atomic_operation_lock),
atomic_operation_lock, *storage, new_value, cmt_atomic_initialize, and
atomic_operation_system_initialized.
lib/cmetrics/src/cmt_metric.c (1)

297-306: Spinlock has no backoff or yield — acceptable for short critical sections.

The busy-wait CAS loop will burn CPU under contention. Given that the protected sections (bucket pointer swaps and snapshot copies) are very short, this is practical. If contention becomes a concern in the future, consider adding a sched_yield() or brief pause inside the loop.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@lib/cmetrics/src/cmt_metric.c` around lines 297 - 306, The spinlock loop in
cmt_metric_exp_hist_lock busy-waits without any backoff, which can burn CPU
under contention; modify cmt_metric_exp_hist_lock to add a brief pause or yield
inside the while loop (e.g., call sched_yield() or a short nanosleep or a
CPU-relax/hint) so the thread yields when the lock isn't acquired, while keeping
the existing CAS-based acquisition logic and fast path intact; update includes
if needed and leave cmt_metric_exp_hist_unlock unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@lib/cmetrics/src/cmt_cat.c`:
- Line 156: Replace the misspelled comment "/* histoggram sum */" with the
correct spelling "/* histogram sum */" in cmt_cat.c (locate the exact comment
string to update); ensure no other occurrences of "histoggram" remain in the
file by searching and correcting them to "histogram".

In `@lib/cmetrics/src/cmt_encode_msgpack.c`:
- Around line 199-225: pack_metric can return -1 when
cmt_metric_exp_hist_get_snapshot fails, but callers (notably pack_basic_type)
ignore that and continue writing the values array, corrupting msgpack; update
pack_basic_type to check the return value from pack_metric and if it returns -1
immediately propagate -1 up, and then propagate that error through
pack_context_metrics and pack_context so cmt_encode_msgpack_create can fail
cleanly (alternatively, if you prefer a placeholder element, write one
consistent placeholder and still return an error); locate and modify the
functions pack_metric, pack_basic_type, pack_context_metrics, pack_context and
the top-level cmt_encode_msgpack_create to ensure each checks called helper
return values and returns -1 on failure, referencing
cmt_metric_exp_hist_get_snapshot and the values array handling in
pack_basic_type to fix the mismatch.

In `@lib/cmetrics/src/cmt_encode_splunk_hec.c`:
- Around line 73-103: initialize_temporary_metric currently calls
cfl_sds_create(source_label->name) without checking source_label->name for NULL;
before calling cfl_sds_create in initialize_temporary_metric, add a guard that
checks if source_label->name is NULL and if so call cfl_sds_create("") (empty
string) instead, and treat allocation failure the same way you already do (free
destination_label and call destroy_temporary_metric_labels(destination) and
return -1); reference the function initialize_temporary_metric and the
struct/field cmt_map_label->name when locating the change.

---

Nitpick comments:
In `@lib/cmetrics/src/cmt_atomic_generic.c`:
- Around line 86-104: In cmt_atomic_store, currently the function continues to
write *storage = new_value even if pthread_mutex_lock(&atomic_operation_lock)
fails; update cmt_atomic_store so that after calling pthread_mutex_lock you
check the returned result and do not proceed with the write unless the lock was
successfully acquired (result == 0) — on failure, call an appropriate failure
path (e.g., return immediately or log via existing error mechanism) and avoid
calling pthread_mutex_unlock; keep the existing early initialization checks
(cmt_atomic_initialize and atomic_operation_system_initialized) and reference
the same symbols: cmt_atomic_store, pthread_mutex_lock(&atomic_operation_lock),
atomic_operation_lock, *storage, new_value, cmt_atomic_initialize, and
atomic_operation_system_initialized.

In `@lib/cmetrics/src/cmt_atomic_msvc.c`:
- Around line 64-68: The guard in cmt_atomic_msvc.c redundantly checks
atomic_operation_system_initialized and atomic_operation_system_status after
cmt_atomic_initialize(); remove the unnecessary status check and align with the
generic backend by changing the condition to only test cmt_atomic_initialize()
and atomic_operation_system_initialized (i.e., if (cmt_atomic_initialize() != 0
|| atomic_operation_system_initialized == 0) return 0;), keeping the same
behavior but eliminating the never-triggered atomic_operation_system_status
check.

In `@lib/cmetrics/src/cmt_metric.c`:
- Around line 297-306: The spinlock loop in cmt_metric_exp_hist_lock busy-waits
without any backoff, which can burn CPU under contention; modify
cmt_metric_exp_hist_lock to add a brief pause or yield inside the while loop
(e.g., call sched_yield() or a short nanosleep or a CPU-relax/hint) so the
thread yields when the lock isn't acquired, while keeping the existing CAS-based
acquisition logic and fast path intact; update includes if needed and leave
cmt_metric_exp_hist_unlock unchanged.

In `@plugins/processor_cumulative_to_delta/cumulative_to_delta_core.c`:
- Around line 1474-1525: In flb_cumulative_to_delta_ctx_process, replace the
three uses of cfl_list_foreach over metrics_context->counters, ->histograms, and
->exp_histograms with cfl_list_foreach_safe to guard against removals during
iteration; add a second iterator variable (e.g., struct cfl_list *tmp) alongside
head and use cfl_list_foreach_safe(head, tmp, &metrics_context->counters) (and
similarly for histograms and exp_histograms) so process_counter_map,
process_histogram_map, and process_exp_histogram_map can safely remove list
entries in future without corrupting the loop.
- Around line 570-586: The eviction loop in series_state_evict_if_needed is
O(n²) because cfl_list_size is called each iteration and the cfl_list_is_empty
check is redundant; add an active_series_count (or series_count) field to struct
flb_cumulative_to_delta_ctx and use it for O(1) checks in
series_state_evict_if_needed (replace cfl_list_size with active_series_count and
remove the unreachable cfl_list_is_empty branch), and update this counter
whenever series are added/removed — increment in series_state_get (or the
function that creates/adds a series) and decrement in series_state_destroy,
series_state_table_del/series_state_gc and inside series_state_evict_if_needed
when you remove entries — also ensure flb_cumulative_to_delta_ctx_configure
adjusts/uses the new counter appropriately.
- Around line 588-672: In process_counter_sample, avoid calling
series_state_update_counter twice when reset_detected is true: remove the early
unconditional call to series_state_update_counter inside the reset handling
block and instead call it only once at the end of the function; to preserve
behavior for the drop-on-reset path, add a conditional that when reset_detected
is true and context->drop_on_reset is true you call
series_state_update_counter(state, timestamp, current_value) once and
immediately return FLB_C2D_DROP, otherwise do not call it there and let the
single final series_state_update_counter(state, timestamp, current_value)
execute before returning FLB_C2D_KEEP. This touches process_counter_sample, the
reset_detected boolean and context->drop_on_reset, and the
series_state_update_counter calls.

}
while (result == 0);

/* histoggram sum */
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Typo: "histoggram" → "histogram".

Proposed fix
-    /* histoggram sum */
+    /* histogram sum */
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/* histoggram sum */
/* histogram sum */
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@lib/cmetrics/src/cmt_cat.c` at line 156, Replace the misspelled comment "/*
histoggram sum */" with the correct spelling "/* histogram sum */" in cmt_cat.c
(locate the exact comment string to update); ensure no other occurrences of
"histoggram" remain in the file by searching and correcting them to "histogram".

Comment on lines +199 to +225
has_start_timestamp = cmt_metric_has_start_timestamp(metric);
if (has_start_timestamp) {
start_timestamp = cmt_metric_get_start_timestamp(metric);
s++;
}
else {
start_timestamp = 0;
}

has_exp_hist_snapshot = CMT_FALSE;

if (map->type == CMT_EXP_HISTOGRAM) {
if (cmt_metric_exp_hist_get_snapshot(metric, &snapshot) != 0) {
return -1;
}
has_exp_hist_snapshot = CMT_TRUE;
}

mpack_start_map(writer, s);

mpack_write_cstr(writer, "ts");
mpack_write_uint(writer, metric->timestamp);
mpack_write_uint(writer, cmt_metric_get_timestamp(metric));

if (has_start_timestamp) {
mpack_write_cstr(writer, "start_ts");
mpack_write_uint(writer, start_timestamp);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle snapshot acquisition failures to avoid corrupt msgpack arrays.

pack_metric now returns -1 if cmt_metric_exp_hist_get_snapshot fails (Line 211-213), but callers in pack_basic_type ignore the return. This can leave the values array length mismatched with actual elements, producing invalid msgpack. Please propagate the error or write a placeholder element.

🛠️ Example propagation (pack_basic_type)
-    if (map->metric_static_set) {
-        pack_metric(writer, map, &map->metric);
-    }
+    if (map->metric_static_set) {
+        if (pack_metric(writer, map, &map->metric) != 0) {
+            return -1;
+        }
+    }

     cfl_list_foreach(head, &map->metrics) {
         metric = cfl_list_entry(head, struct cmt_metric, _head);
-        pack_metric(writer, map, metric);
+        if (pack_metric(writer, map, metric) != 0) {
+            return -1;
+        }
     }

You’ll also want to propagate this up through pack_context_metrics / pack_context so cmt_encode_msgpack_create can fail cleanly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@lib/cmetrics/src/cmt_encode_msgpack.c` around lines 199 - 225, pack_metric
can return -1 when cmt_metric_exp_hist_get_snapshot fails, but callers (notably
pack_basic_type) ignore that and continue writing the values array, corrupting
msgpack; update pack_basic_type to check the return value from pack_metric and
if it returns -1 immediately propagate -1 up, and then propagate that error
through pack_context_metrics and pack_context so cmt_encode_msgpack_create can
fail cleanly (alternatively, if you prefer a placeholder element, write one
consistent placeholder and still return an error); locate and modify the
functions pack_metric, pack_basic_type, pack_context_metrics, pack_context and
the top-level cmt_encode_msgpack_create to ensure each checks called helper
return values and returns -1 on failure, referencing
cmt_metric_exp_hist_get_snapshot and the values array handling in
pack_basic_type to fix the mismatch.

Comment on lines +73 to +103
static int initialize_temporary_metric(struct cmt_metric *destination,
struct cmt_metric *source)
{
struct cfl_list *head;
struct cmt_map_label *source_label;
struct cmt_map_label *destination_label;

memset(destination, 0, sizeof(struct cmt_metric));
cfl_list_init(&destination->labels);

cfl_list_foreach(head, &source->labels) {
source_label = cfl_list_entry(head, struct cmt_map_label, _head);

destination_label = calloc(1, sizeof(struct cmt_map_label));
if (destination_label == NULL) {
destroy_temporary_metric_labels(destination);
return -1;
}

destination_label->name = cfl_sds_create(source_label->name);
if (destination_label->name == NULL) {
free(destination_label);
destroy_temporary_metric_labels(destination);
return -1;
}

cfl_list_add(&destination_label->_head, &destination->labels);
}

cmt_metric_set_timestamp(destination, cmt_metric_get_timestamp(source));

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, let's understand the cmt_map_label structure and cfl_sds_create behavior
rg -n "typedef struct cmt_map_label|cmt_map_label\s*\{" -A 10 -g "*.h"

Repository: fluent/fluent-bit

Length of output: 951


🏁 Script executed:

#!/bin/bash
# Check if cfl_sds_create handles NULL safely
rg -n "cfl_sds_create" -B 2 -A 5 -g "*.c" -g "*.h" | head -100

Repository: fluent/fluent-bit

Length of output: 6334


🏁 Script executed:

#!/bin/bash
# Search for how labels are created/populated to understand if name can be NULL
rg -n "name\s*=\s*cfl_sds_create|->name\s*=\s*NULL|calloc.*cmt_map_label" -g "*.c" -g "*.h" -B 2 -A 2 | head -80

Repository: fluent/fluent-bit

Length of output: 3919


🏁 Script executed:

#!/bin/bash
# Look for existing NULL checks on label names in the codebase
rg -n "label->name.*NULL|source_label.*NULL|destination_label.*NULL" -g "*.c" -g "*.h"

Repository: fluent/fluent-bit

Length of output: 1226


🏁 Script executed:

#!/bin/bash
# Look at the initialize_temporary_metric function and nearby code in cmt_encode_splunk_hec.c
cat -n lib/cmetrics/src/cmt_encode_splunk_hec.c | sed -n '70,110p'

Repository: fluent/fluent-bit

Length of output: 1573


🏁 Script executed:

#!/bin/bash
# Compare with the similar function in cmt_encode_prometheus.c to see how it handles source labels
rg -n "initialize_temporary_metric|cmt_encode_prometheus" lib/cmetrics/src/cmt_encode_prometheus.c -A 40 | head -100

Repository: fluent/fluent-bit

Length of output: 4221


🏁 Script executed:

#!/bin/bash
# Check what cfl_sds_create does - look for its definition or behavior
rg -n "cfl_sds_create\(" -B 2 -A 8 lib/cmetrics/src/ | grep -A 8 "if.*source_label->name"

Repository: fluent/fluent-bit

Length of output: 43


🏁 Script executed:

#!/bin/bash
# Look at lines around 93 in the splunk_hec file to see the NULL check pattern
cat -n lib/cmetrics/src/cmt_encode_splunk_hec.c | sed -n '85,100p'

Repository: fluent/fluent-bit

Length of output: 698


Guard against NULL label names when cloning.

initialize_temporary_metric calls cfl_sds_create(source_label->name) where source_label->name can be NULL. While the code checks the result at line 93, it's safer to validate the source label name before the call, following the defensive pattern used elsewhere in the codebase (e.g., processor_labels/labels.c:775).

🛡️ Suggested guard
-        destination_label->name = cfl_sds_create(source_label->name);
+        if (source_label->name == NULL) {
+            free(destination_label);
+            destroy_temporary_metric_labels(destination);
+            return -1;
+        }
+        destination_label->name = cfl_sds_create(source_label->name);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@lib/cmetrics/src/cmt_encode_splunk_hec.c` around lines 73 - 103,
initialize_temporary_metric currently calls cfl_sds_create(source_label->name)
without checking source_label->name for NULL; before calling cfl_sds_create in
initialize_temporary_metric, add a guard that checks if source_label->name is
NULL and if so call cfl_sds_create("") (empty string) instead, and treat
allocation failure the same way you already do (free destination_label and call
destroy_temporary_metric_labels(destination) and return -1); reference the
function initialize_temporary_metric and the struct/field cmt_map_label->name
when locating the change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant