-
-
Notifications
You must be signed in to change notification settings - Fork 4
fix: streaming responses lock-up under some conditions #105
Conversation
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
WalkthroughThe overarching changes involve enhancing the robustness and responsiveness of the worker pool and stream processing within a concurrent execution environment. This includes handling early client disconnections, improving state management, and increasing buffer sizes to prevent lock-ups. The adjustments aim to ensure that workers recover gracefully from interrupted streams and that the system can handle high volumes of data without stalling. Changes
Assessment against linked issues
Related issues
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit's AI:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 15
Configuration used: CodeRabbit UI
Files ignored due to filter (2)
- go.mod
- go.sum
Files selected for processing (6)
- ipc/pipe/pipe_factory_spawn_test.go (1 hunks)
- ipc/pipe/pipe_factory_test.go (1 hunks)
- pool/static_pool/debug.go (2 hunks)
- pool/static_pool/supervisor_test.go (2 hunks)
- pool/static_pool/workers_pool.go (4 hunks)
- worker/worker.go (7 hunks)
Files skipped from review due to trivial changes (1)
- pool/static_pool/supervisor_test.go
Additional comments: 4
pool/static_pool/debug.go (2)
74-74: Replacing
StreamIter
withStreamIterWithContext
is a good practice as it allows for better control over stream iteration, especially in cases where early termination might be needed. This change aligns with the PR's goal to handle early client disconnections more gracefully.76-77: Using
runtime.Goexit()
to terminate the goroutine after sending an error on theresp
channel is appropriate here. It ensures that the goroutine exits cleanly without affecting other running goroutines.ipc/pipe/pipe_factory_spawn_test.go (1)
- 399-399: Adding
w.State().Transition(fsm.StateReady)
afterw.Exec
calls ensures that the worker state is reset toReady
after each execution. This reflects the expected state transitions in the actual worker lifecycle and should lead to more accurate testing.ipc/pipe/pipe_factory_test.go (1)
- 424-424: Including
w.State().Transition(fsm.StateReady)
after eachw.Exec
call in the test ensures that the worker's state is consistent with the expected behavior after task execution. This change improves the test's fidelity to real-world scenarios.
if !w.State().Compare(fsm.StateWorking) { | ||
return errors.Errorf("worker is not in the Working state, actual state: (%s)", w.State().String()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The state check in StreamCancel
(lines 308-310) is good practice, but it should also ensure that the worker is transitioned to an appropriate state after cancellation.
+ // TODO: Verify that the worker state is correctly managed after a stream cancellation.
Committable suggestion
❗ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
if !w.State().Compare(fsm.StateWorking) { | |
return errors.Errorf("worker is not in the Working state, actual state: (%s)", w.State().String()) | |
} | |
// TODO: Verify that the worker state is correctly managed after a stream cancellation. | |
if !w.State().Compare(fsm.StateWorking) { | |
return errors.Errorf("worker is not in the Working state, actual state: (%s)", w.State().String()) | |
} |
|
||
c := w.getCh() | ||
// set last used time | ||
w.State().SetLastUsed(uint64(time.Now().UnixNano())) | ||
w.State().Transition(fsm.StateWorking) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Exec
method (lines 386-390) should have a clear strategy for handling errors that occur during the execution of a payload, including cleanup and state management.
+ // TODO: Define a clear error handling strategy for the Exec method.
Committable suggestion
❗ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
c := w.getCh() | |
// set last used time | |
w.State().SetLastUsed(uint64(time.Now().UnixNano())) | |
w.State().Transition(fsm.StateWorking) | |
c := w.getCh() | |
// TODO: Define a clear error handling strategy for the Exec method. | |
// set last used time | |
w.State().SetLastUsed(uint64(time.Now().UnixNano())) | |
w.State().Transition(fsm.StateWorking) |
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #105 +/- ##
==========================================
- Coverage 77.20% 73.61% -3.59%
==========================================
Files 23 23
Lines 2088 2164 +76
==========================================
- Hits 1612 1593 -19
- Misses 421 519 +98
+ Partials 55 52 -3 ☔ View full report in Codecov by Sentry. |
Reason for This PR
closes: roadrunner-server/roadrunner#1830
Description of Changes
License Acceptance
By submitting this pull request, I confirm that my contribution is made under
the terms of the MIT license.
PR Checklist
[Author TODO: Meet these criteria.]
[Reviewer TODO: Verify that these criteria are met. Request changes if not]
git commit -s
).CHANGELOG.md
.Summary by CodeRabbit
Bug Fixes
Refactor
Tests
Performance Improvements