-
Notifications
You must be signed in to change notification settings - Fork 2
Clone workers in streams #1585
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: graphite-base/1585
Are you sure you want to change the base?
Clone workers in streams #1585
Conversation
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
cba9652 to
2d4a39f
Compare
| worker.worker.startStream(typeofStream.Message); | ||
| let allWorkers = workers.getAll().map((w) => w.worker); | ||
| if (this.config.cloned) { | ||
| allWorkers = await Promise.all(allWorkers.map((w) => w.clone())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Promise.all here can leak clones when one clone() rejects: successful clones aren’t tracked or cleaned up. Consider cloning sequentially with try/catch, cleaning up any successful clones before rethrowing.
- allWorkers = await Promise.all(allWorkers.map((w) => w.clone()));
- }
+ const clones: WorkerClient[] = [];
+ try {
+ for (const w of allWorkers) {
+ const c = await w.clone();
+ clones.push(c);
+ }
+ allWorkers = clones;
+ } catch (err) {
+ for (const c of clones) {
+ try {
+ c.stopStreams();
+ } catch {}
+ }
+ throw err;
+ }
+ }🚀 Reply to ask Macroscope to explain or update this suggestion.
👍 Helpful? React to give us feedback.
2d4a39f to
939f72b
Compare
a44ea29 to
6ee6e0f
Compare
939f72b to
67d5f3a
Compare
6ee6e0f to
b96795f
Compare
7ecb6f0 to
1e4abb8
Compare
1e4abb8 to
72fa4b4
Compare

Add cloned worker support for background streams and make
StreamsChaos.startasync in streams.tsIntroduce
StreamsConfigwithclonedflag, updateStreamsChaosto optionallyclone()WorkerClientinstances before starting streams, and change RuntimeConfig to carrybackgroundStreamsconfig instead of a boolean.📍Where to Start
Start with
StreamsChaosconstruction andstartflow in streams.ts.📊 Macroscope summarized 3d501d6. 7 files reviewed, 14 issues evaluated, 13 issues filtered, 0 comments posted
🗂️ Filtered Issues
chaos/db.ts — 0 comments posted, 2 evaluated, 2 filtered
DbChaos.startdoes not guard against being called multiple times. Each call setsthis.interval = setInterval(...)without clearing any existing interval, resulting in multiple intervals running concurrently and repeatedly applying locks. This can cause overlapping lock attempts and unexpected load. Add a guard (e.g., clear existing interval or throw if already started) before setting a new one. [ Low confidence ]impactedWorkerPercentageis used without validation inDbChaos.start. If it isNaN, negative, or >100, the conditionMath.random() * 100 > impactedWorkerPercentagemisbehaves (e.g., any comparison withNaNis false, locking all workers every interval). Validate the value is a number between 0 and 100 and handle invalid input (e.g., clamp or skip). [ Low confidence ]chaos/streams.ts — 0 comments posted, 1 evaluated, 1 filtered
StreamsChaos.stoponly callsstopStreams()on stored workers but never terminates clonedWorkerClientinstances created instartwhenconfig.clonedis true. This leaves worker threads/process resources alive, causing leaks and lingering background activity. Ensure clones are terminated (e.g.,await worker.terminate()) and cleanly disposed. [ Low confidence ]forks/cli.ts — 0 comments posted, 1 evaluated, 1 filtered
runForkDetectionmisinterprets per-run fork detection by using the total count of files inlogs/cleanedfor each run (getForkCount()), then accumulating that total intostats.forksDetectedon every iteration. This causes overcounting across runs and per-run output to show cumulative totals rather than forks detected in that run. Specifically:forkCountis the cumulative number of cleaned logs across all runs, but the code printsRun i: ... ${forkCount} fork(s) detectedand doesstats.forksDetected += forkCount. You should compute forks detected for the current run only (e.g., diff the count before/after cleaning, or base on filenames associated with the current run), and increment stats by that delta. [ Out of scope ]helpers/versions.ts — 0 comments posted, 2 evaluated, 2 filtered
regressionClientmatches the providednodeBindingsstring againstVersionList.find((v) => v.nodeBindings === nodeBindings). Callers passsdk(e.g., "4.3.0") intoregressionClientas thenodeBindingsargument, which will not matchVersionList.nodeBindingsvalues (e.g., "1.7.0"). This causes a runtime error:SDK version ${nodeBindings} not found in VersionList. Ensure the lookup uses the correct field (nodeSDKvsnodeBindings) or pass the correct value from the caller. [ Out of scope ]ClientClass.createis invoked withouthistorySyncUrlanddisableDeviceSync, while the initial attempt includes these options. This yields inconsistent client behavior depending on whether the first attempt fails. Preserve identical options across both paths. [ Low confidence ]workers/main.ts — 0 comments posted, 4 evaluated, 4 filtered
getStats()may dereferencethis.client.debugInformationwhen it’s undefined.apiStatistics()is guarded by?., butthis.client.debugInformation.clearAllStatistics()is not, causing a possible runtime exception. Guard both calls or null-checkdebugInformationfirst. [ Out of scope ]startSynclaunches an infinitewhile (true)loop without any cancellation mechanism, independent ofstopStreams()orterminate(). Multiple invocations spawn multiple perpetual loops. This can cause runaway background activity and hinder clean shutdown. Provide an abort/cancel control or tie the loop to a tracked flag that’s cleared onterminate()/stopStreams(). [ Out of scope ]startStreamallows starting the same stream type multiple times. When the stream is already active, it does not return, creating another stream and overwriting thestreamControllersentry for thattype. Older streams cannot be aborted viaendStream, leading to orphaned streams and potential leaks. Restore the early return when a stream is already active or manage multiple instances safely. [ Out of scope ]initMessageStreamandinitConversationStream, errors inside the async stream loop are re-thrown from within avoidIIFE. This results in unhandled promise rejections, breaking the loop and potentially destabilizing the process without a controlled shutdown or retry. Handle errors by logging and gracefully breaking the loop or implementing a bounded retry with backoff, and ensure stream cleanup on all paths. [ Out of scope ]workers/manager.ts — 0 comments posted, 3 evaluated, 3 filtered
getWorkersno longer uses the mappedapiUrlvalues when the input is aRecord<string, string>. Previously, the record’s value was forwarded tomanager.createWorker(descriptor, sdkVersions[0], apiUrl). The current code ignoresapiUrland callsmanager.createWorker(descriptor, randomSdk)for each descriptor, breaking the prior external contract and causing workers to be initialized without the intended API URL routing. [ Low confidence ]getNextFolderNamedoes not actually ensure the folder name is available. When.dataexists, it returns a random 32-character alphanumeric string without checkingfs.readdirSyncor similar to confirm non-existence. This can collide with an existing folder and cause downstream failures when trying to create/use the directory. Given the comment "Helper function to get the next available folder name", this is a behavioral bug: availability is not verified. [ Low confidence ].dataexists,getNextFolderNamenow returns a 32-character alphanumeric string (including digits) instead of a deterministic single lowercase letter based on existing folders. Any callers expecting a short,[a-z]name or 'next' sequential behavior may break (validation, UI, path length constraints, or ordering). This introduces a runtime incompatibility risk with existing consumers. [ Low confidence ]