-
-
Notifications
You must be signed in to change notification settings - Fork 4
fix: memory allocations in the debug mode #110
Conversation
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
Important Auto Review SkippedReview was skipped due to path filters Files ignored due to path filters (1)
WalkthroughThe update to the Changes
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review Status
Actionable comments generated: 1
Configuration used: CodeRabbit UI
Files selected for processing (1)
- pool/static_pool/debug.go (3 hunks)
Additional comments: 5
pool/static_pool/debug.go (5)
- 24-24: The comment on line 24 has been modified to clarify the purpose of reading the exit status, which is to prevent the process from becoming a zombie. This is a good practice as it enhances code readability and understanding.
- 36-36: Reducing the buffer size of the channel from an unspecified size to 5 on line 36 is a targeted optimization to reduce memory allocations in debug mode. However, ensure this buffer size is sufficient for the expected workload to avoid potential bottlenecks.
- 44-44: The deferred function starting on line 44 correctly handles the cleanup and logging for the stream worker. This includes closing the response channel, stopping the worker, and logging any errors encountered. This is a good practice for managing resources and providing debug information.
- 64-74: The handling of the stop signal for the stream on lines 64-74 is well-implemented. It includes logging the receipt of the stop signal, attempting to cancel the stream with a timeout, and transitioning the worker's state based on the outcome. This logic is crucial for ensuring that resources are properly managed and that the system can gracefully handle termination signals.
- 129-129: Creating a response channel with a buffer size of 1 on line 129 for non-stream responses is appropriate and aligns with the goal of optimizing memory usage in debug mode. This ensures that memory is efficiently used for synchronous operations.
// we have to set a stream timeout on every request | ||
switch sp.supervisedExec { | ||
case true: | ||
ctxT, cancelT := context.WithTimeout(context.Background(), sp.cfg.Supervisor.ExecTTL) | ||
pld, next, errI := w.StreamIterWithContext(ctxT) | ||
cancelT() | ||
if errI != nil { | ||
sp.log.Warn("stream error", zap.Error(err)) | ||
|
||
resp <- newPExec(nil, errI) | ||
|
||
// move worker to the invalid state to restart | ||
w.State().Transition(fsm.StateInvalid) | ||
runtime.Goexit() | ||
} | ||
|
||
resp <- newPExec(pld, nil) | ||
|
||
if !next { | ||
w.State().Transition(fsm.StateReady) | ||
// we've got the last frame | ||
runtime.Goexit() | ||
} | ||
case false: | ||
// non supervised execution, can potentially hang here | ||
pld, next, errI := w.StreamIter() | ||
if errI != nil { | ||
sp.log.Warn("stream iter error", zap.Error(err)) | ||
// send error response | ||
resp <- newPExec(nil, errI) | ||
|
||
// move worker to the invalid state to restart | ||
w.State().Transition(fsm.StateInvalid) | ||
runtime.Goexit() | ||
} | ||
|
||
resp <- newPExec(pld, nil) | ||
|
||
resp <- newPExec(pld, nil) | ||
if !next { | ||
// we've got the last frame | ||
runtime.Goexit() | ||
if !next { | ||
w.State().Transition(fsm.StateReady) | ||
// we've got the last frame | ||
runtime.Goexit() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for handling stream iteration based on the supervisedExec
flag from lines 79-121 introduces complexity but is necessary for supporting both supervised and unsupervised execution modes. However, there's repeated code for handling errors and transitioning worker states in both cases. Consider refactoring this logic into a separate function to adhere to the DRY (Don't Repeat Yourself) principle and simplify the code.
+ func handleStreamError(w *Worker, resp chan *PExec, err error) {
+ sp.log.Warn("stream error", zap.Error(err))
+ resp <- newPExec(nil, err)
+ w.State().Transition(fsm.StateInvalid)
+ runtime.Goexit()
+ }
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
// we have to set a stream timeout on every request | |
switch sp.supervisedExec { | |
case true: | |
ctxT, cancelT := context.WithTimeout(context.Background(), sp.cfg.Supervisor.ExecTTL) | |
pld, next, errI := w.StreamIterWithContext(ctxT) | |
cancelT() | |
if errI != nil { | |
sp.log.Warn("stream error", zap.Error(err)) | |
resp <- newPExec(nil, errI) | |
// move worker to the invalid state to restart | |
w.State().Transition(fsm.StateInvalid) | |
runtime.Goexit() | |
} | |
resp <- newPExec(pld, nil) | |
if !next { | |
w.State().Transition(fsm.StateReady) | |
// we've got the last frame | |
runtime.Goexit() | |
} | |
case false: | |
// non supervised execution, can potentially hang here | |
pld, next, errI := w.StreamIter() | |
if errI != nil { | |
sp.log.Warn("stream iter error", zap.Error(err)) | |
// send error response | |
resp <- newPExec(nil, errI) | |
// move worker to the invalid state to restart | |
w.State().Transition(fsm.StateInvalid) | |
runtime.Goexit() | |
} | |
resp <- newPExec(pld, nil) | |
resp <- newPExec(pld, nil) | |
if !next { | |
// we've got the last frame | |
runtime.Goexit() | |
if !next { | |
w.State().Transition(fsm.StateReady) | |
// we've got the last frame | |
runtime.Goexit() | |
} | |
// we have to set a stream timeout on every request | |
switch sp.supervisedExec { | |
case true: | |
ctxT, cancelT := context.WithTimeout(context.Background(), sp.cfg.Supervisor.ExecTTL) | |
pld, next, errI := w.StreamIterWithContext(ctxT) | |
cancelT() | |
if errI != nil { | |
sp.log.Warn("stream error", zap.Error(err)) | |
resp <- newPExec(nil, errI) | |
// move worker to the invalid state to restart | |
w.State().Transition(fsm.StateInvalid) | |
runtime.Goexit() | |
} | |
resp <- newPExec(pld, nil) | |
if !next { | |
w.State().Transition(fsm.StateReady) | |
// we've got the last frame | |
runtime.Goexit() | |
} | |
case false: | |
// non supervised execution, can potentially hang here | |
pld, next, errI := w.StreamIter() | |
if errI != nil { | |
sp.log.Warn("stream iter error", zap.Error(err)) | |
// send error response | |
resp <- newPExec(nil, errI) | |
// move worker to the invalid state to restart | |
w.State().Transition(fsm.StateInvalid) | |
runtime.Goexit() | |
} | |
resp <- newPExec(pld, nil) | |
if !next { | |
w.State().Transition(fsm.StateReady) | |
// we've got the last frame | |
runtime.Goexit() | |
} | |
func handleStreamError(w *Worker, resp chan *PExec, err error) { | |
sp.log.Warn("stream error", zap.Error(err)) | |
resp <- newPExec(nil, err) | |
w.State().Transition(fsm.StateInvalid) | |
runtime.Goexit() | |
} |
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #110 +/- ##
==========================================
- Coverage 74.05% 72.36% -1.69%
==========================================
Files 23 23
Lines 2166 2204 +38
==========================================
- Hits 1604 1595 -9
- Misses 510 557 +47
Partials 52 52 ☔ View full report in Codecov by Sentry. |
Signed-off-by: Valery Piashchynski <piashchynski.valery@gmail.com>
Reason for This PR
Description of Changes
License Acceptance
By submitting this pull request, I confirm that my contribution is made under
the terms of the MIT license.
PR Checklist
[Author TODO: Meet these criteria.]
[Reviewer TODO: Verify that these criteria are met. Request changes if not]
git commit -s
).CHANGELOG.md
.Summary by CodeRabbit