-
Notifications
You must be signed in to change notification settings - Fork 1.9k
in_emitter: in_forward: Ensure ring buffer on threaded mode in emitter #10950
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
in_emitter: in_forward: Ensure ring buffer on threaded mode in emitter #10950
Conversation
…ine parser Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
WalkthroughUpdates emitter ring buffer initialization logic in plugins/in_emitter/emitter.c for threaded inputs and adjusts collector FD handling and logging. Adds a new runtime test in tests/runtime/in_forward.c to validate a threaded Forward input combined with a multiline processor and a lib output callback counter. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant In as Input (threaded)
participant Em as Emitter ctx
participant Col as Collector
Note over In,Em: Initialization in threaded mode
In->>Em: init()
alt ring_buffer_size == 0
rect rgba(200,230,255,0.3)
Em->>Em: set default ring buffer size
Em->>Col: register collector (time-based)
Col-->>Em: return collector FD
Em->>Em: store ctx.coll_fd
end
else ring buffer already sized
Em->>Em: skip re-init
end
Em-->>In: return status (based on coll_fd)
sequenceDiagram
autonumber
participant Client as Forward client
participant Eng as Engine
participant IFw as in_forward (threaded)
participant MP as Multiline Processor
participant Lib as lib output
participant CB as cb_count_only
Client->>Eng: Send Forward frame ["logs", ts, {"log":"hello"}]
Eng->>IFw: Dispatch input event
IFw->>MP: Process record (partial_message mode)
MP-->>IFw: Emit processed record
IFw->>Lib: Deliver record for tag "logs"
Lib->>CB: Invoke callback with record
CB-->>Lib: Increment counter, free record
Lib-->>Eng: Output observed (count >= 1)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Suggested labels
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/in_emitter/emitter.c (1)
187-190: Fix leak when ring buffer write failsIf flb_ring_buffer_write() returns non‑zero (buffer full/error), the allocated tag and msgpack buffer are not freed, leaking memory. Release them on failure.
Apply this diff:
- return flb_ring_buffer_write(ctx->msgs, - (void *) &temporary_chunk, - sizeof(struct em_chunk)); + { + int wret = flb_ring_buffer_write(ctx->msgs, + (void *) &temporary_chunk, + sizeof(struct em_chunk)); + if (wret != 0) { + /* ring buffer full or error: release allocated resources */ + flb_sds_destroy(temporary_chunk.tag); + msgpack_sbuffer_destroy(&temporary_chunk.mp_sbuf); + } + return wret; + }
🧹 Nitpick comments (2)
plugins/in_emitter/emitter.c (1)
281-286: Right‑size ring buffer capacity to stored object sizeCapacity is computed with sizeof(void*) but entries written are struct em_chunk. Size the buffer in bytes for the intended number of chunks to avoid premature “full” conditions.
Proposed change:
- ctx->msgs = flb_ring_buffer_create(sizeof(void *) * ctx->ring_buffer_size); + ctx->msgs = flb_ring_buffer_create(sizeof(struct em_chunk) * ctx->ring_buffer_size);Please confirm flb_ring_buffer_create expects capacity in bytes (not items). If it expects items, rename ring_buffer_size accordingly and pass the correct item count.
tests/runtime/in_forward.c (1)
801-901: Make threaded test use the synchronized counterRemove the unsynchronized local counter and rely on the shared helpers; also clear the counter before starting.
Apply this diff:
void flb_test_threaded_forward_issue_10946() { struct flb_lib_out_cb cb = {0}; flb_ctx_t *ctx; - int in_ffd, out_ffd, ret; - int out_count = 0; + int in_ffd, out_ffd, ret; flb_sockfd_t fd; char *buf; size_t size; int root_type; struct flb_processor *proc; struct flb_processor_unit *pu; @@ - cb.cb = cb_count_only; - cb.data = &out_count; + clear_output_num(); + cb.cb = cb_count_only; + cb.data = NULL; @@ - /* With the fix, at least one record must arrive */ - TEST_CHECK(out_count > 0); - if (!TEST_CHECK(out_count > 0)) { + /* With the fix, at least one record must arrive */ + ret = get_output_num(); + TEST_CHECK(ret > 0); + if (!TEST_CHECK(ret > 0)) { TEST_MSG("no outputs with threaded+multiline; emitter RB/collector likely missing"); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
plugins/in_emitter/emitter.c(2 hunks)tests/runtime/in_forward.c(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
tests/runtime/in_forward.c (4)
include/fluent-bit/flb_mem.h (1)
flb_free(126-128)src/flb_lib.c (10)
flb_create(138-220)flb_service_set(647-673)flb_input(261-271)flb_input_set(300-330)flb_input_set_processor(332-348)flb_output(274-284)flb_output_set(515-546)flb_start(914-925)flb_stop(942-985)flb_destroy(223-258)src/flb_processor.c (3)
flb_processor_create(134-160)flb_processor_unit_create(162-308)flb_processor_unit_set_property(580-612)src/flb_pack.c (1)
flb_pack_json(530-535)
plugins/in_emitter/emitter.c (1)
src/flb_input.c (1)
flb_input_set_collector_time(1685-1704)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (29)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
- GitHub Check: PR - fuzzing test
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
- GitHub Check: pr-compile-centos-7
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
🔇 Additional comments (3)
plugins/in_emitter/emitter.c (2)
288-291: Correctly capturing collector id for pause/resume — LGTMStoring the return value of flb_input_set_collector_time into ctx->coll_fd fixes the pause/resume linkage. Returning success based on the stored id also looks good.
321-325: Auto‑enable ring buffer for threaded inputs — LGTMCondition looks right and the debug message is clear. Good simplification.
tests/runtime/in_forward.c (1)
913-915: Registering the new regression test — LGTMThe test is added to TEST_LIST under a clear name.
5c1e8f0 to
8399e16
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
tests/runtime/in_forward.c(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/runtime/in_forward.c (4)
include/fluent-bit/flb_mem.h (1)
flb_free(126-128)src/flb_lib.c (10)
flb_create(138-220)flb_service_set(647-673)flb_input(261-271)flb_input_set(300-330)flb_input_set_processor(332-348)flb_output(274-284)flb_output_set(515-546)flb_start(914-925)flb_stop(942-985)flb_destroy(223-258)src/flb_processor.c (3)
flb_processor_create(134-160)flb_processor_unit_create(162-308)flb_processor_unit_set_property(580-612)src/flb_pack.c (1)
flb_pack_json(530-535)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (29)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
- GitHub Check: PR - fuzzing test
- GitHub Check: pr-compile-centos-7
Signed-off-by: Hiroshi Hatake <hiroshi@chronosphere.io>
8399e16 to
281ae15
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
tests/runtime/in_forward.c(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/runtime/in_forward.c (4)
include/fluent-bit/flb_mem.h (1)
flb_free(126-128)src/flb_lib.c (9)
flb_create(138-220)flb_input(261-271)flb_input_set(300-330)flb_input_set_processor(332-348)flb_output(274-284)flb_output_set(515-546)flb_start(914-925)flb_stop(942-985)flb_destroy(223-258)src/flb_processor.c (2)
flb_processor_create(134-160)flb_processor_unit_set_property(580-612)src/flb_pack.c (1)
flb_pack_json(530-535)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (29)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
- GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
- GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
- GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
- GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
- GitHub Check: pr-compile-centos-7
- GitHub Check: PR - fuzzing test
To ensure ring buffer creations before starting in_emitter context,
we need to handle coll_fd correctly and avoid to use heuristic approach to distinguish whether threaded mode or not.
This PR prevents for SEGV in such cases.
Plus, added a regression test case which was reported in #10946.
Closes ##10946.
Enter
[N/A]in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-testlabel to test for all targets (requires maintainer to do).Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
Bug Fixes
Tests