You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
A Synchronous Core Scheduler was implemented in Fluent Bit 1.9.10 for the CloudWatch plugin to ensure strict ordering when processing tasks. The PR makes it so that per each output plugin instance, tasks received first are completed before later tasks are started.
It was written to allow for CloudWatch Logs plugin to use the async network stack (the sync network stack had issues) while keeping network operations synchronous which is needed for the CloudWatch API.
A bug was found when a single task is sent to multiple output plugins.
The task may be prematurely deleted.
Each task is sent to 4 different output instances of cloudwatch_logs.
4 Task events are logged
Task Dispatch
Task Increment
Task Decrement
Task Destroy
Task id and address are tracked.
Without any fix, the following instrumentation results:
[2023/02/09 19:05:52] [ info] [TCP_A] Dispatch : Plugin cloudwatch_logs.0 Task 0x7ffff06feb90 of id 2, users is: 0
[2023/02/09 19:05:52] [ info] [TCP_A] Dispatch : Plugin cloudwatch_logs.1 Task 0x7ffff06feb90 of id 2, users is: 0
[2023/02/09 19:05:52] [ info] [TCP_A] Dispatch : Plugin cloudwatch_logs.2 Task 0x7ffff06feb90 of id 2, users is: 0
[2023/02/09 19:05:52] [ info] [TCP_A] Dispatch : Plugin cloudwatch_logs.3 Task 0x7ffff06feb90 of id 2, users is: 0
[2023/02/09 19:05:53] [ info] [TCP_A] Increment: Plugin cloudwatch_logs.3 Task 0x7ffff06feb90 of id 2, users is: 0 -> 1
[2023/02/09 19:05:53] [ info] [TCP_A] Decrement: Plugin cloudwatch_logs.3 Task 0x7ffff06feb90 of id 2, users is: 1 -> 0
[2023/02/09 19:05:53] [ info] [TCP_A] Destroy : Task 0x7ffff06feb90 of id 2, users is: 0
[2023/02/09 19:05:53] [ info] [TCP_A] Increment: Plugin cloudwatch_logs.0 Task 0x7ffff06feb90 of id 0, users is: 0 -> 1
[2023/02/09 19:05:53] [error] [build/src/CMakeFiles/fluent-bit-static.dir/compiler_depend.ts:109 errno=12] Cannot allocate memory
[2023/02/09 19:05:53] [error] [build/plugins/out_cloudwatch_logs/CMakeFiles/flb-plugin-out_cloudwatch_logs.dir/compiler_depend.ts:1110 errno=12] Cannot allocate memory
[2023/02/09 19:05:53] [error] [output:cloudwatch_logs:cloudwatch_logs.0] Failed to send events
[2023/02/09 19:06:02] [engine] caught signal (SIGSEGV)
Dispatch
The task is dispatched to each of the 4 cloudwatch workers
You can see that when the sync task scheduler is used, when the task is created and dispatched to each of the worker threads, the task's user count is not incremented and the user count stays at 0. This is because of a code change from the sync task scheduler delays the task's flush/dispatch function which increments the user count here:
Without the sync task scheduler, user count is incremented on each dispatch which is the expected behavior.
One of the outputs process task
When a task begins to be processed, it's user count increments from 0 to 1, it's task is processed, and then the task's user count is decremented from 1 to 0.
[2023/02/09 19:05:53] [ info] [TCP_A] Increment: Plugin cloudwatch_logs.3 Task 0x7ffff06feb90 of id 2, users is: 0 -> 1
[2023/02/09 19:05:53] [ info] [TCP_A] Decrement: Plugin cloudwatch_logs.3 Task 0x7ffff06feb90 of id 2, users is: 1 -> 0
Destroy
When a task's users count hits 0, the task is cleaned up and destroyed
[2023/02/09 19:05:53] [ info] [TCP_A] Destroy : Task 0x7ffff06feb90 of id 2, users is: 0
Other outputs segfault
But there are still other output instances who have had the task dispatched to them. When the output begins processing the task, a segfault occurs because the task has been deleted.
[2023/02/09 19:05:53] [ info] [TCP_A] Increment: Plugin cloudwatch_logs.0 Task 0x7ffff06feb90 of id 0, users is: 0 -> 1
[2023/02/09 19:05:53] [error] [build/src/CMakeFiles/fluent-bit-static.dir/compiler_depend.ts:109 errno=12] Cannot allocate memory
[2023/02/09 19:05:53] [error] [build/plugins/out_cloudwatch_logs/CMakeFiles/flb-plugin-out_cloudwatch_logs.dir/compiler_depend.ts:1110 errno=12] Cannot allocate memory
[2023/02/09 19:05:53] [error] [output:cloudwatch_logs:cloudwatch_logs.0] Failed to send events
[2023/02/09 19:06:02] [engine] caught signal (SIGSEGV)
Problem
This is a problem with how the sync core scheduler tracks users. On dispatching tasks, the sync core scheduler:
Puts the task into a queue on the output plugin
Waits for all previous tasks in output plugin queue to complete
Starts task via flush/writing to the worker task pipe which increments users
The time between steps 1 and 3 is the problem.
Previous behavior before sync core scheduler was:
Starts task via flush/writing to the worker task pipe which increments users
Between steps 1 and 3, the task may be deleted because the task in the queue is not registered to have any users.
Solution
The solution is to somehow inform the task that it is being used while it is in the queue so that it is not deleted before it has a chance to be processed.
Then to modify flush/dispatch to change up the logic when the sync scheduler is used to:
Do nothing (not increment anymore) when the task is successfully flushed
Decrement (instead of doing nothing) when the task fails to be flushed
I don't like this option because
It modifies the flush function, which I don't want to touch for a sync scheduler change
It makes the code path convoluted by adding strange branches
Option 2
The second option would be to always call increment when the task is added to the output plugin's queue when in synchronous mode.
Then always call decrement just before flush later when the task is taken out of the queue and ready to be processed.
I like this option because
It doesn't modify the flush function and only modifies the sync scheduler code
It adds no branches so the logic is streamline
But what about the race conditions
All the user increment and decrements happen on the control thread before dispatching tasks to the workers and after those workers complete their work.
More testing was done with duplicate tag cases and also without duplicate tag cases. The code no longer segfaults and transmits data to the output destinations.
The text was updated successfully, but these errors were encountered:
This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the exempt-stale label.
Background
A Synchronous Core Scheduler was implemented in Fluent Bit 1.9.10 for the CloudWatch plugin to ensure strict ordering when processing tasks. The PR makes it so that per each output plugin instance, tasks received first are completed before later tasks are started.
Master #6413
1.9 #6339
It was written to allow for CloudWatch Logs plugin to use the async network stack (the sync network stack had issues) while keeping network operations synchronous which is needed for the CloudWatch API.
A bug was found when a single task is sent to multiple output plugins.
The task may be prematurely deleted.
Sync Scheduler Task Instrumentation Walkthrough
Task instrumentation was added to 1.9: https://github.com/matthewfala/fluent-bit/commits/task-instrumentation
Each task is sent to 4 different output instances of cloudwatch_logs.
4 Task events are logged
Task id and address are tracked.
Without any fix, the following instrumentation results:
Dispatch
The task is dispatched to each of the 4 cloudwatch workers
You can see that when the sync task scheduler is used, when the task is created and dispatched to each of the worker threads, the task's user count is not incremented and the user count stays at 0. This is because of a code change from the sync task scheduler delays the task's flush/dispatch function which increments the user count here:
https://github.com/fluent/fluent-bit/pull/6413/files#diff-7c07a6e4c34710b3b48808d85aa3cf23b6225c83e7cdf52a7063caa78f691ee6R78-R95
Without the sync task scheduler, user count is incremented on each dispatch which is the expected behavior.
One of the outputs process task
When a task begins to be processed, it's user count increments from 0 to 1, it's task is processed, and then the task's user count is decremented from 1 to 0.
Destroy
When a task's users count hits 0, the task is cleaned up and destroyed
Other outputs segfault
But there are still other output instances who have had the task dispatched to them. When the output begins processing the task, a segfault occurs because the task has been deleted.
Problem
This is a problem with how the sync core scheduler tracks users. On dispatching tasks, the sync core scheduler:
The time between steps 1 and 3 is the problem.
Previous behavior before sync core scheduler was:
Between steps 1 and 3, the task may be deleted because the task in the queue is not registered to have any users.
Solution
The solution is to somehow inform the task that it is being used while it is in the queue so that it is not deleted before it has a chance to be processed.
Solution Options
Option 1
The first option would be to directly add some task user increment logic right before enqueue is called: https://github.com/fluent/fluent-bit/pull/6413/files#diff-7c07a6e4c34710b3b48808d85aa3cf23b6225c83e7cdf52a7063caa78f691ee6R83
Then to modify flush/dispatch to change up the logic when the sync scheduler is used to:
I don't like this option because
Option 2
The second option would be to always call increment when the task is added to the output plugin's queue when in synchronous mode.
Then always call decrement just before flush later when the task is taken out of the queue and ready to be processed.
I like this option because
But what about the race conditions
All the user increment and decrements happen on the control thread before dispatching tasks to the workers and after those workers complete their work.
Implementation
1.9 #6817
Master #6818
Testing:
More testing was done with duplicate tag cases and also without duplicate tag cases. The code no longer segfaults and transmits data to the output destinations.
The text was updated successfully, but these errors were encountered: