Skip to content

Conversation

@jinyongchoi
Copy link
Contributor

@jinyongchoi jinyongchoi commented Jan 5, 2026

Problem
When Fluent Bit shuts down with grace period, threaded input plugins may still have data pending in their ring buffers. The engine could exit before this data was flushed to chunks, causing data loss.

Solution
ring_buffer: Add flb_ring_buffer_get_used() to check pending data size
input_thread: Add is_paused flag to track pause acknowledgement from threaded inputs
input_chunk: Bypass pause check during shutdown to allow ring buffer flush, add flb_input_chunk_total_ring_buffers_size() helper
engine: During grace period, wait for all threaded inputs to acknowledge pause and drain ring buffers before final shutdown
Fixes #11338


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
[SERVICE]
    flush 1
    grace 60
    log_level info
    log_file /tmp/testing/logs/testing.log
    parsers_file /tmp/testing/parsers.conf
    plugins_file /tmp/testing/plugins.conf
    http_server on
    http_listen 0.0.0.0
    http_port 22002

    storage.path /tmp/testing/storage
    storage.metrics on
    storage.max_chunks_up 512
    storage.sync full
    storage.checksum off
    storage.backlog.mem_limit 100M

[INPUT]
    Name tail
    Path /tmp/testing.input
    Tag testing
    Key message
    Offset_Key   log_offset

    Read_from_Head true
    Refresh_Interval 3
    Rotate_Wait 31557600

    Buffer_Chunk_Size 1MB
    Buffer_Max_Size 16MB
    Inotify_Watcher false

    storage.type filesystem
    storage.pause_on_chunks_overlimit true

    DB /tmp/testing/storage/testing.db
    DB.sync normal
    DB.locking false

    Alias input_log

    thread.ring_buffer.capacity  128
    Threaded true

[OUTPUT]
    Name file
    Match *
    File /tmp/testing.out
    workers 1
  • Debug log output from testing the change
    The "ring buffer pending" log is rarely seen because:
  • Default 250ms collector interval keeps the buffer empty
  • Shutdown bypass allows immediate data processing
FLB_DEV_RB_MS=10000 /opt/fluent-bit/bin/fluent-bit -v -c /etc/fluent-bit/fluent-bit.conf

[2026/01/05 17:15:14.231117707] [ info] [engine] ring buffer pending: 32 bytes
  • Attached Valgrind output that shows no leaks or memory corruption was found
==597334== 
==597334== HEAP SUMMARY:
==597334==     in use at exit: 0 bytes in 0 blocks
==597334==   total heap usage: 10,991,093 allocs, 10,991,093 frees, 12,233,758,702 bytes allocated
==597334== 
==597334== All heap blocks were freed -- no leaks are possible
==597334== 
==597334== For lists of detected and suppressed errors, rerun with: -s
==597334== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • [ N/A ] Run local packaging test showing all targets (including any new ones) build.
  • [ N/A ] Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • [ N/A ] Documentation required for this feature

Backporting

  • [ N/A ] Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features

    • Reports total pending ring-buffer size during engine startup and shutdown
    • Adds ability to query per-buffer used size and overall ring-buffer usage
    • Tracks per-thread pause state to better coordinate threaded input pause/resume
  • Bug Fixes

    • Ensures pending ring-buffer data is collected and flushed during shutdown
    • Prevents resume/flush races during shutdown for more reliable termination

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Jan 5, 2026

📝 Walkthrough

Walkthrough

Adds APIs and state to report per-input ring-buffer usage and threaded-input pause status; integrates ring-buffer collection into engine startup/shutdown flows and updates pause/resume handling to ensure ring-buffer data is drained during graceful shutdown.

Changes

Cohort / File(s) Summary
Public headers
include/fluent-bit/flb_ring_buffer.h, include/fluent-bit/flb_input_chunk.h, include/fluent-bit/flb_input_thread.h
Added declarations: size_t flb_ring_buffer_get_used(struct flb_ring_buffer *rb), size_t flb_input_chunk_get_total_ring_buffer_size(const struct flb_config *config), and volatile sig_atomic_t is_paused; field in struct flb_input_thread_instance.
Ring buffer implementation
src/flb_ring_buffer.c
Implemented flb_ring_buffer_get_used() returning used bytes via lwrb_get_full().
Input chunk management
src/flb_input_chunk.c
Added flb_input_chunk_get_total_ring_buffer_size(config); skip per-iteration pause checks when shutting down; added shutdown guards to resume logic and adjusted append behavior to allow flushing during shutdown.
Input thread handling
src/flb_input_thread.c
Set/clear is_paused on FLB_INPUT_THREAD_PAUSE/RESUME for threaded instances after plugin callbacks.
Engine startup/shutdown
src/flb_engine.c
Added all_threaded_inputs_paused() helper; query total ring-buffer size at startup/shutdown; trigger flb_input_chunk_ring_buffer_collector when ring-buffer usage exists; include ring-buffer and threaded-input pause checks in pending-work/shutdown logic.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant Engine
    participant Threads as "Input Threads"
    participant Collector as "RB Collector"
    participant RingBuf as "Ring Buffers"

    Engine->>Engine: rb_size = flb_input_chunk_get_total_ring_buffer_size(config)
    alt rb_size > 0
        Engine->>Collector: flb_input_chunk_ring_buffer_collector(config, NULL)
        Collector->>RingBuf: drain ring buffers -> flush to chunks
        Collector-->>Engine: collection progress/completed
    end
    Engine->>Threads: all_threaded_inputs_paused()? (check is_paused flags)
    alt not all paused
        Threads-->>Engine: some inputs still active
        Engine->>Engine: increment pending work / wait
    else all paused
        Threads-->>Engine: all paused
    end
    Engine->>Engine: continue shutdown when pending==0
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested labels

backport to v4.0.x

Suggested reviewers

  • edsiper

Poem

🐰 I nibble bytes beneath the moonlit beam,
Threads pause softly so collectors may gleam,
Buffers emptied, every record freed,
Shutdown tiptoes — no lost log to heed,
Hooray — the data hops home with me!

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 45.45% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'engine: fix threaded input data loss during shutdown' accurately and specifically summarizes the main objective of the PR.
Linked Issues check ✅ Passed The PR implements all coding requirements from issue #11338: adds flb_ring_buffer_get_used(), introduces is_paused flag, bypasses pause checks during shutdown, implements total ring buffer size calculation, and waits for threaded inputs to pause and drain during graceful shutdown.
Out of Scope Changes check ✅ Passed All changes are directly scoped to addressing ring buffer data loss during shutdown: ring buffer APIs, thread pause tracking, shutdown logic in engine and input_chunk, and pause state management in input_thread.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: c6f4b64b0e

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Fix all issues with AI Agents 🤖
In @include/fluent-bit/flb_input_thread.h:
- Around line 92-98: Replace the volatile flag with a C11 atomic type so thread
reads/writes have proper synchronization: change the declaration of is_paused
from "volatile int is_paused;" to an atomic integer (e.g., "_Atomic int
is_paused;" or "atomic_int is_paused;") and add the required include
(<stdatomic.h>) where this header or its implementation uses the flag; update
any direct assignments/reads in the input thread and main thread to use plain
loads/stores (or atomic_store/atomic_load) as appropriate to preserve the same
semantics.
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a981f66 and c6f4b64.

📒 Files selected for processing (7)
  • include/fluent-bit/flb_input_chunk.h
  • include/fluent-bit/flb_input_thread.h
  • include/fluent-bit/flb_ring_buffer.h
  • src/flb_engine.c
  • src/flb_input_chunk.c
  • src/flb_input_thread.c
  • src/flb_ring_buffer.c
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-11-21T06:23:29.770Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 11171
File: include/fluent-bit/flb_lib.h:52-53
Timestamp: 2025-11-21T06:23:29.770Z
Learning: In Fluent Bit core (fluent/fluent-bit repository), function descriptions/documentation are not required for newly added functions in header files.

Applied to files:

  • include/fluent-bit/flb_input_chunk.h
  • src/flb_engine.c
  • include/fluent-bit/flb_ring_buffer.h
📚 Learning: 2025-08-29T06:25:02.561Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.561Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.

Applied to files:

  • src/flb_engine.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • src/flb_engine.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • src/flb_engine.c
🧬 Code graph analysis (4)
include/fluent-bit/flb_input_chunk.h (1)
src/flb_input_chunk.c (1)
  • flb_input_chunk_total_ring_buffers_size (3310-3324)
src/flb_engine.c (1)
src/flb_input_chunk.c (2)
  • flb_input_chunk_total_ring_buffers_size (3310-3324)
  • flb_input_chunk_ring_buffer_collector (3061-3111)
include/fluent-bit/flb_ring_buffer.h (1)
src/flb_ring_buffer.c (1)
  • flb_ring_buffer_get_used (205-208)
src/flb_input_chunk.c (3)
include/fluent-bit/flb_input.h (1)
  • flb_input_buf_paused (705-715)
src/flb_input.c (1)
  • flb_input_name (790-797)
src/flb_ring_buffer.c (1)
  • flb_ring_buffer_get_used (205-208)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (32)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
  • GitHub Check: Agent
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_COMPILER_STRICT_POINTER_TYPES=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-without-cxx (3.31.6)
  • GitHub Check: pr-compile-centos-7
  • GitHub Check: PR - fuzzing test
🔇 Additional comments (12)
src/flb_ring_buffer.c (1)

205-208: LGTM - Consistent with existing API style.

The function is a straightforward wrapper around lwrb_get_full(). The lack of NULL checks is consistent with other functions in this file (flb_ring_buffer_write, flb_ring_buffer_read), indicating that callers are expected to pass valid pointers.

src/flb_input_thread.c (2)

81-84: LGTM - Proper pause state tracking.

The is_paused flag is set after the pause callback completes, correctly signaling to the main thread that pause processing is finished. The conditional check protects against accessing thi when it's not available.


90-93: LGTM - Proper resume state tracking.

The is_paused flag is cleared after the resume callback completes, maintaining symmetry with the pause handler. The defensive checks are appropriate.

include/fluent-bit/flb_input_chunk.h (1)

165-165: LGTM - New API for total ring buffer size tracking.

The function signature is clear and the implementation (seen in src/flb_input_chunk.c:3309-3323) correctly sums the used space across all threaded inputs' ring buffers.

include/fluent-bit/flb_ring_buffer.h (1)

43-43: LGTM - New API exposes ring buffer usage.

The function provides a clean interface to query the amount of used space in the ring buffer, essential for the shutdown coordination logic. The implementation delegates to the underlying lwrb library.

src/flb_input_chunk.c (4)

2416-2426: LGTM! Correct guard to prevent input resume during shutdown.

The is_shutting_down check correctly prevents inputs from being resumed during the graceful shutdown phase, ensuring the engine can properly drain ring buffers without new data being ingested.


2672-2683: LGTM! Critical fix for data loss prevention during shutdown.

The conditional bypass of the pause check when is_shutting_down == FLB_TRUE correctly allows pending ring buffer data to be flushed to chunks during graceful shutdown. The logic is sound: normal operation respects backpressure, while shutdown prioritizes data preservation.


3074-3084: LGTM! Consistent shutdown bypass logic.

This mirrors the pause-bypass logic in input_chunk_append_raw, ensuring the ring buffer collector can fully drain all pending data during shutdown regardless of pause state. The symmetry between producer and collector is correct.


3305-3324: LGTM! Clean implementation of ring buffer size aggregation.

The function correctly:

  • Guards against non-threaded inputs and null ring buffers
  • Aggregates used space across all threaded input instances
  • Returns appropriate size_t type

The underlying lwrb_get_full should be safe for single-reader scenarios (main engine thread), which matches the usage pattern here.

src/flb_engine.c (3)

36-40: LGTM! Required includes for new functionality.

Both headers are necessary:

  • flb_input_thread.h for accessing the is_paused flag in threaded input instances
  • flb_input_chunk.h for flb_input_chunk_total_ring_buffers_size and flb_input_chunk_ring_buffer_collector

805-822: LGTM! Proper helper for threaded input pause verification.

The function correctly checks all threaded input instances to verify they have acknowledged the pause signal. The defensive check for both is_threaded and thi before accessing is_paused is appropriate.


1200-1213: Well-structured shutdown sequence for ring buffer draining.

The logic correctly:

  1. Includes ring buffer presence in pending work calculation (rb_size > 0 adds 1)
  2. Actively drains ring buffers each shutdown tick when data is pending
  3. Only checks thread pause completion after all other work is done (tasks=0, chunks=0, ring buffer empty)
  4. Continues waiting if threaded inputs haven't fully acknowledged pause

This ensures data isn't lost during shutdown by waiting for both ring buffer drainage and thread coordination.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request addresses a critical data loss issue during Fluent Bit shutdown where threaded input plugins with data pending in ring buffers could exit before flushing that data to chunks.

Key Changes:

  • Added ring buffer monitoring during shutdown to track pending data
  • Implemented pause acknowledgement tracking for threaded inputs
  • Modified pause checks to allow data flushing during shutdown

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
src/flb_ring_buffer.c Adds flb_ring_buffer_get_used() function to query pending data size in ring buffer
include/fluent-bit/flb_ring_buffer.h Declares new flb_ring_buffer_get_used() function
include/fluent-bit/flb_input_thread.h Adds is_paused volatile flag to track thread pause state
src/flb_input_thread.c Sets/clears is_paused flag on pause/resume operations
src/flb_input_chunk.c Bypasses pause checks during shutdown in collector and append functions; adds helper to calculate total ring buffer size
include/fluent-bit/flb_input_chunk.h Declares flb_input_chunk_total_ring_buffers_size() helper function
src/flb_engine.c Enhances shutdown logic to wait for threaded input pause acknowledgement and drain ring buffers before final exit
Comments suppressed due to low confidence (1)

src/flb_input_chunk.c:3090

  • At line 3086, the code attempts to read from ins->rb without checking if the ring buffer exists. While the previous check at line 3081 calls flb_input_buf_paused(ins) which might implicitly verify the ring buffer, there's no explicit null check for ins->rb before calling flb_ring_buffer_read. This could cause a null pointer dereference if an input instance doesn't have a ring buffer. Add a check: if (!ins->rb) continue; before the while loop starts.
            ret = flb_ring_buffer_read(ins->rb,
                                       (void *) &cr,
                                       sizeof(cr));
            if (ret != 0) {
                break;

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@edsiper
Copy link
Member

edsiper commented Jan 5, 2026

@codex review

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: c6f4b64b0e

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@jinyongchoi jinyongchoi force-pushed the fix/11265-ring-buffer branch from 9fbf309 to 3482153 Compare January 6, 2026 03:08
@jinyongchoi
Copy link
Contributor Author

While testing this PR, I found that thread.ring_buffer.capacity and thread.ring_buffer.window options are useful for tuning the Tail plugin in threaded mode.

I've added documentation for these parameters to the Tail input plugin: fluent/fluent-bit-docs#2320

@jinyongchoi
Copy link
Contributor Author

@edsiper @cosmo0920
Hello, this PR is ready for review. Looking forward to your feedback. Thanks!

@cosmo0920
Copy link
Contributor

@edsiper @cosmo0920
Hello, this PR is ready for review. Looking forward to your feedback. Thanks!

I added a comment about volatile sig_atomic_t for possible atomic operated variables.

@jinyongchoi
Copy link
Contributor Author

@edsiper @cosmo0920
Hello, this PR is ready for review. Looking forward to your feedback. Thanks!

I added a comment about volatile sig_atomic_t for possible atomic operated variables.

Thanks for pointing this out! The volatile sig_atomic_t pattern makes sense.

Add flb_ring_buffer_get_used() to retrieve the current used size
of a ring buffer. This is needed to check pending data during
shutdown sequence.

Signed-off-by: jinyong.choi <inimax801@gmail.com>
Add volatile is_paused flag to track when a threaded input has
completed its pause operation. This allows the engine to verify
all threaded inputs are fully paused before proceeding with shutdown.

Signed-off-by: jinyong.choi <inimax801@gmail.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @include/fluent-bit/flb_input_chunk.h:
- Line 165: The declaration of flb_input_chunk_get_total_ring_buffer_size should
mark its config parameter as const since it is only read; update the prototype
to use const struct flb_config *config and also update the corresponding
implementation in src/flb_input_chunk.c (the
flb_input_chunk_get_total_ring_buffer_size definition) to accept and use const
struct flb_config *config so signatures match and preserve API
const-correctness.
🧹 Nitpick comments (1)
include/fluent-bit/flb_input_chunk.h (1)

165-165: Consider adding documentation for the new public API.

Public APIs benefit from brief documentation explaining their purpose and behavior. Consider adding a comment above this declaration to describe that it returns the total used size across all threaded input ring buffers in the system.

📝 Example documentation
+/*
+ * Get total used size across all threaded input ring buffers.
+ * Returns the sum of used bytes in ring buffers for all threaded input instances.
+ */
 size_t flb_input_chunk_get_total_ring_buffer_size(const struct flb_config *config);
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9a47ffd and 0ff6167.

📒 Files selected for processing (3)
  • include/fluent-bit/flb_input_chunk.h
  • include/fluent-bit/flb_input_thread.h
  • include/fluent-bit/flb_ring_buffer.h
🚧 Files skipped from review as they are similar to previous changes (1)
  • include/fluent-bit/flb_ring_buffer.h
🧰 Additional context used
🧠 Learnings (5)
📚 Learning: 2025-09-08T11:21:33.975Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 10851
File: include/fluent-bit/flb_simd.h:60-66
Timestamp: 2025-09-08T11:21:33.975Z
Learning: Fluent Bit currently only supports MSVC compiler on Windows, so additional compiler compatibility guards may be unnecessary for Windows-specific code paths.

Applied to files:

  • include/fluent-bit/flb_input_thread.h
📚 Learning: 2025-11-21T06:23:29.770Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 11171
File: include/fluent-bit/flb_lib.h:52-53
Timestamp: 2025-11-21T06:23:29.770Z
Learning: In Fluent Bit core (fluent/fluent-bit repository), function descriptions/documentation are not required for newly added functions in header files.

Applied to files:

  • include/fluent-bit/flb_input_thread.h
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • include/fluent-bit/flb_input_thread.h
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • include/fluent-bit/flb_input_thread.h
📚 Learning: 2025-12-08T05:21:45.014Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 11250
File: src/ripser/flb_ripser_wrapper.cpp:73-106
Timestamp: 2025-12-08T05:21:45.014Z
Learning: In the TDA processor (processor_tda) for Fluent Bit, the dimension limit FLB_RIPSER_MAX_BETTI_DIM is intentionally capped at 3 because the plugin uses embed_dim=3 and delay=1 in practice, and computing higher dimensions (>4) would be computationally prohibitive. The 8-slot betti array allocation is conservative headroom.

Applied to files:

  • include/fluent-bit/flb_input_thread.h
🧬 Code graph analysis (1)
include/fluent-bit/flb_input_chunk.h (1)
src/flb_input_chunk.c (1)
  • flb_input_chunk_get_total_ring_buffer_size (3315-3329)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (31)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
  • GitHub Check: pr-compile-without-cxx (3.31.6)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_COMPILER_STRICT_POINTER_TYPES=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
  • GitHub Check: pr-compile-centos-7
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
  • GitHub Check: PR - fuzzing test
🔇 Additional comments (2)
include/fluent-bit/flb_input_thread.h (2)

29-29: LGTM! Necessary include for the new field type.

The signal.h include is required for the sig_atomic_t type used in the new is_paused field.


94-99: LGTM! Appropriate type choice for shutdown synchronization.

The volatile sig_atomic_t type is well-suited for this pause acknowledgement flag. The type provides atomicity for simple set/check operations across threads without requiring heavier synchronization primitives. The documentation clearly explains the field's role in shutdown coordination.

Based on PR discussion, this type choice follows maintainer guidance from cosmo0920.

int flb_input_chunk_is_up(struct flb_input_chunk *ic);
void flb_input_chunk_update_output_instances(struct flb_input_chunk *ic,
size_t chunk_size);
size_t flb_input_chunk_get_total_ring_buffer_size(struct flb_config *config);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add const qualifier to the config parameter.

The implementation only reads from the config structure and does not modify it. The parameter should be declared as const struct flb_config *config to signal read-only intent and improve API safety.

🔧 Proposed fix
-size_t flb_input_chunk_get_total_ring_buffer_size(struct flb_config *config);
+size_t flb_input_chunk_get_total_ring_buffer_size(const struct flb_config *config);

Note: The implementation in src/flb_input_chunk.c will also need to be updated to match.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
size_t flb_input_chunk_get_total_ring_buffer_size(struct flb_config *config);
size_t flb_input_chunk_get_total_ring_buffer_size(const struct flb_config *config);
🤖 Prompt for AI Agents
In @include/fluent-bit/flb_input_chunk.h at line 165, The declaration of
flb_input_chunk_get_total_ring_buffer_size should mark its config parameter as
const since it is only read; update the prototype to use const struct flb_config
*config and also update the corresponding implementation in
src/flb_input_chunk.c (the flb_input_chunk_get_total_ring_buffer_size
definition) to accept and use const struct flb_config *config so signatures
match and preserve API const-correctness.

During shutdown (is_shutting_down=TRUE), bypass the pause check to
allow flushing remaining ring buffer data. Also add helper function
flb_input_chunk_total_ring_buffers_size() to calculate total pending
data across all threaded inputs, and improve cleanup logging.

Signed-off-by: jinyong.choi <inimax801@gmail.com>
During grace period, ensure all threaded inputs have acknowledged
pause before final shutdown. Also drain pending ring buffer data
to prevent data loss. This fixes a race condition where the engine
could exit while threaded inputs still had buffered data.

Signed-off-by: jinyong.choi <inimax801@gmail.com>
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
src/flb_engine.c (2)

805-832: Solid implementation with appropriate defensive checks.

The function correctly determines if all threaded inputs have acknowledged pause. The defensive logic to skip inputs without pause/resume callbacks or context is good practice. The use of volatile sig_atomic_t for is_paused (as mentioned in PR comments) provides adequate atomicity for this read operation.

📝 Optional: Add comment explaining skip rationale

Consider adding a brief comment above line 820 explaining why these specific conditions are checked:

             */
+            /* Only check inputs that initialized successfully and support pause/resume */
             if (in->p->cb_pause == NULL || in->p->cb_resume == NULL ||

This would help future readers understand the intent behind skipping certain inputs.


1194-1224: Correct shutdown sequencing for ring buffer draining.

The shutdown logic correctly integrates ring buffer awareness:

  1. Includes ring buffer data in pending work count (line 1210)
  2. Triggers collection when data is present (lines 1212-1215)
  3. Checks pause acknowledgement only after all other work is complete (lines 1218-1223)

This sequencing ensures ring buffers are drained before verifying that threaded inputs have acknowledged pause, which is critical for data integrity during graceful shutdown.

♻️ Optional: Clarify pending work calculation

Line 1210 mixes integer counts with a boolean expression. While functionally correct, it could be clearer:

-                    ret = tasks + mem_chunks + fs_chunks + (rb_size > 0);
+                    /* Count all pending work: tasks, chunks, and ring buffers */
+                    ret = tasks + mem_chunks + fs_chunks;
+                    if (rb_size > 0) {
+                        ret++;  /* ring buffer has pending data */
+                    }

This makes the intent more explicit, though the current form is concise and correct.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0ff6167 and b1884fa.

📒 Files selected for processing (3)
  • include/fluent-bit/flb_input_chunk.h
  • src/flb_engine.c
  • src/flb_input_chunk.c
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2025-11-21T06:23:29.770Z
Learnt from: cosmo0920
Repo: fluent/fluent-bit PR: 11171
File: include/fluent-bit/flb_lib.h:52-53
Timestamp: 2025-11-21T06:23:29.770Z
Learning: In Fluent Bit core (fluent/fluent-bit repository), function descriptions/documentation are not required for newly added functions in header files.

Applied to files:

  • include/fluent-bit/flb_input_chunk.h
  • src/flb_engine.c
📚 Learning: 2025-08-29T06:25:02.561Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:7-7
Timestamp: 2025-08-29T06:25:02.561Z
Learning: In Fluent Bit, ZSTD (zstandard) compression library is bundled directly in the source tree at `lib/zstd-1.5.7` and is built unconditionally as a static library. Unlike optional external dependencies, ZSTD does not use conditional compilation guards like `FLB_HAVE_ZSTD` and is always available. Headers like `<fluent-bit/flb_zstd.h>` can be included directly without guards.

Applied to files:

  • src/flb_engine.c
📚 Learning: 2025-08-31T12:46:11.940Z
Learnt from: ThomasDevoogdt
Repo: fluent/fluent-bit PR: 9277
File: .github/workflows/pr-compile-check.yaml:147-151
Timestamp: 2025-08-31T12:46:11.940Z
Learning: In fluent-bit CMakeLists.txt, the system library preference flags are defined as FLB_PREFER_SYSTEM_LIB_ZSTD and FLB_PREFER_SYSTEM_LIB_KAFKA with the FLB_ prefix.

Applied to files:

  • src/flb_engine.c
📚 Learning: 2025-08-29T06:25:27.250Z
Learnt from: shadowshot-x
Repo: fluent/fluent-bit PR: 10794
File: tests/internal/aws_compress.c:93-107
Timestamp: 2025-08-29T06:25:27.250Z
Learning: In Fluent Bit, ZSTD compression is enabled by default and is treated as a core dependency, not requiring conditional compilation guards like `#ifdef FLB_HAVE_ZSTD`. Unlike some other optional components such as ARROW/PARQUET (which use `#ifdef FLB_HAVE_ARROW` guards), ZSTD support is always available and doesn't need build-time conditionals. ZSTD headers are included directly without guards across multiple plugins and core components.

Applied to files:

  • src/flb_engine.c
🧬 Code graph analysis (3)
include/fluent-bit/flb_input_chunk.h (1)
src/flb_input_chunk.c (1)
  • flb_input_chunk_get_total_ring_buffer_size (3315-3329)
src/flb_engine.c (1)
src/flb_input_chunk.c (2)
  • flb_input_chunk_get_total_ring_buffer_size (3315-3329)
  • flb_input_chunk_ring_buffer_collector (3061-3116)
src/flb_input_chunk.c (3)
include/fluent-bit/flb_input.h (1)
  • flb_input_buf_paused (708-718)
src/flb_input.c (1)
  • flb_input_name (802-809)
src/flb_ring_buffer.c (1)
  • flb_ring_buffer_get_used (205-208)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (31)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_COMPILER_STRICT_POINTER_TYPES=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-without-cxx (3.31.6)
  • GitHub Check: PR - fuzzing test
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
  • GitHub Check: pr-compile-centos-7
🔇 Additional comments (6)
include/fluent-bit/flb_input_chunk.h (1)

165-165: LGTM! Well-designed API.

The new function signature is appropriate for querying total ring buffer usage during shutdown. The const-correctness and size_t return type are suitable for this read-only operation.

src/flb_input_chunk.c (4)

2413-2440: Correctly prevents resume during shutdown.

The addition of is_shutting_down == FLB_FALSE guards at lines 2416 and 2430 ensures inputs remain paused during shutdown, preventing new data ingestion while allowing existing ring buffer data to be flushed. This is essential for graceful shutdown without data loss.


2672-2683: Critical for ring buffer draining during shutdown.

This bypass allows ring buffer data to be appended to chunks even when inputs are paused, which is essential for flushing remaining buffers during graceful shutdown. The detailed comment clearly explains the rationale and behavior difference between normal operation and shutdown.


3074-3089: Completes the ring buffer draining mechanism.

This bypass in the collector complements the append_raw bypass (lines 2672-2683), ensuring ring buffer data can be both read and written during shutdown regardless of pause state. The comprehensive comment explains the dual behavior clearly.


3315-3329: Well-implemented ring buffer size aggregation.

The function correctly iterates through all input instances, safely checks for threaded inputs with ring buffers, and aggregates their used sizes. The logic is clear and defensive against null ring buffer pointers.

src/flb_engine.c (1)

36-40: Necessary includes for shutdown integration.

The new includes provide access to threaded input state tracking (flb_input_thread.h) and ring buffer operations (flb_input_chunk.h), which are required for the shutdown logic enhancements.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

engine: Ring buffer data loss during graceful shutdown with threaded inputs(in_tail)

3 participants