@@ -35,7 +35,7 @@ const {
35
35
const { exitCodes : { kUnsettledTopLevelAwait } } = internalBinding ( 'errors' ) ;
36
36
const { URL } = require ( 'internal/url' ) ;
37
37
const { canParse : URLCanParse } = internalBinding ( 'url' ) ;
38
- const { receiveMessageOnPort, isMainThread } = require ( 'worker_threads' ) ;
38
+ const { receiveMessageOnPort } = require ( 'worker_threads' ) ;
39
39
const {
40
40
isAnyArrayBuffer,
41
41
isArrayBufferView,
@@ -482,8 +482,6 @@ class HooksProxy {
482
482
*/
483
483
#worker;
484
484
485
- #portToHooksThread;
486
-
487
485
/**
488
486
* The last notification ID received from the worker. This is used to detect
489
487
* if the worker has already sent a notification before putting the main
@@ -501,38 +499,26 @@ class HooksProxy {
501
499
#isReady = false ;
502
500
503
501
constructor ( ) {
504
- const { InternalWorker, hooksPort } = require ( 'internal/worker' ) ;
502
+ const { InternalWorker } = require ( 'internal/worker' ) ;
503
+ MessageChannel ??= require ( 'internal/worker/io' ) . MessageChannel ;
504
+
505
505
const lock = new SharedArrayBuffer ( SHARED_MEMORY_BYTE_LENGTH ) ;
506
506
this . #lock = new Int32Array ( lock ) ;
507
507
508
- if ( isMainThread ) {
509
- // Main thread is the only one that creates the internal single hooks worker
510
- this . #worker = new InternalWorker ( loaderWorkerId , {
511
- stderr : false ,
512
- stdin : false ,
513
- stdout : false ,
514
- trackUnmanagedFds : false ,
515
- workerData : {
516
- lock,
517
- } ,
518
- } ) ;
519
- this . #worker. unref ( ) ; // ! Allows the process to eventually exit.
520
- this . #worker. on ( 'exit' , process . exit ) ;
521
- this . #portToHooksThread = this . #worker;
522
- } else {
523
- this . #portToHooksThread = hooksPort ;
524
- }
508
+ this . #worker = new InternalWorker ( loaderWorkerId , {
509
+ stderr : false ,
510
+ stdin : false ,
511
+ stdout : false ,
512
+ trackUnmanagedFds : false ,
513
+ workerData : {
514
+ lock,
515
+ } ,
516
+ } ) ;
517
+ this . #worker. unref ( ) ; // ! Allows the process to eventually exit.
518
+ this . #worker. on ( 'exit' , process . exit ) ;
525
519
}
526
520
527
521
waitForWorker ( ) {
528
- // There is one Hooks instance for each worker thread. But only one of these Hooks instances
529
- // has an InternalWorker. That was the Hooks instance created for the main thread.
530
- // It means for all Hooks instances that are not on the main thread => they are ready because they
531
- // delegate to the single InternalWorker anyway.
532
- if ( ! isMainThread ) {
533
- return ;
534
- }
535
-
536
522
if ( ! this . #isReady) {
537
523
const { kIsOnline } = require ( 'internal/worker' ) ;
538
524
if ( ! this . #worker[ kIsOnline ] ) {
@@ -549,37 +535,6 @@ class HooksProxy {
549
535
}
550
536
}
551
537
552
- #postMessageToWorker( method , type , transferList , args ) {
553
- this . waitForWorker ( ) ;
554
-
555
- MessageChannel ??= require ( 'internal/worker/io' ) . MessageChannel ;
556
-
557
- const {
558
- port1 : fromHooksThread ,
559
- port2 : toHooksThread ,
560
- } = new MessageChannel ( ) ;
561
-
562
- // Pass work to the worker.
563
- debug ( `post ${ type } message to worker` , { method, args, transferList } ) ;
564
- const usedTransferList = [ toHooksThread ] ;
565
- if ( transferList ) {
566
- ArrayPrototypePushApply ( usedTransferList , transferList ) ;
567
- }
568
-
569
- this . #portToHooksThread. postMessage (
570
- {
571
- __proto__ : null ,
572
- args,
573
- lock : this . #lock,
574
- method,
575
- port : toHooksThread ,
576
- } ,
577
- usedTransferList ,
578
- ) ;
579
-
580
- return fromHooksThread ;
581
- }
582
-
583
538
/**
584
539
* Invoke a remote method asynchronously.
585
540
* @param {string } method Method to invoke
@@ -588,7 +543,22 @@ class HooksProxy {
588
543
* @returns {Promise<any> }
589
544
*/
590
545
async makeAsyncRequest ( method , transferList , ...args ) {
591
- const fromHooksThread = this . #postMessageToWorker( method , 'Async' , transferList , args ) ;
546
+ this . waitForWorker ( ) ;
547
+
548
+ MessageChannel ??= require ( 'internal/worker/io' ) . MessageChannel ;
549
+ const asyncCommChannel = new MessageChannel ( ) ;
550
+
551
+ // Pass work to the worker.
552
+ debug ( 'post async message to worker' , { method, args, transferList } ) ;
553
+ const finalTransferList = [ asyncCommChannel . port2 ] ;
554
+ if ( transferList ) {
555
+ ArrayPrototypePushApply ( finalTransferList , transferList ) ;
556
+ }
557
+ this . #worker. postMessage ( {
558
+ __proto__ : null ,
559
+ method, args,
560
+ port : asyncCommChannel . port2 ,
561
+ } , finalTransferList ) ;
592
562
593
563
if ( this . #numberOfPendingAsyncResponses++ === 0 ) {
594
564
// On the next lines, the main thread will await a response from the worker thread that might
@@ -597,11 +567,7 @@ class HooksProxy {
597
567
// However we want to keep the process alive until the worker thread responds (or until the
598
568
// event loop of the worker thread is also empty), so we ref the worker until we get all the
599
569
// responses back.
600
- if ( this . #worker) {
601
- this . #worker. ref ( ) ;
602
- } else {
603
- this . #portToHooksThread. ref ( ) ;
604
- }
570
+ this . #worker. ref ( ) ;
605
571
}
606
572
607
573
let response ;
@@ -610,26 +576,18 @@ class HooksProxy {
610
576
await AtomicsWaitAsync ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION , this . #workerNotificationLastId) . value ;
611
577
this . #workerNotificationLastId = AtomicsLoad ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION ) ;
612
578
613
- response = receiveMessageOnPort ( fromHooksThread ) ;
579
+ response = receiveMessageOnPort ( asyncCommChannel . port1 ) ;
614
580
} while ( response == null ) ;
615
581
debug ( 'got async response from worker' , { method, args } , this . #lock) ;
616
582
617
583
if ( -- this . #numberOfPendingAsyncResponses === 0 ) {
618
584
// We got all the responses from the worker, its job is done (until next time).
619
- if ( this . #worker) {
620
- this . #worker. unref ( ) ;
621
- } else {
622
- this . #portToHooksThread. unref ( ) ;
623
- }
624
- }
625
-
626
- if ( response . message . status === 'exit' ) {
627
- process . exit ( response . message . body ) ;
585
+ this . #worker. unref ( ) ;
628
586
}
629
587
630
- fromHooksThread . close ( ) ;
631
-
632
- return this . #unwrapMessage ( response ) ;
588
+ const body = this . #unwrapMessage ( response ) ;
589
+ asyncCommChannel . port1 . close ( ) ;
590
+ return body ;
633
591
}
634
592
635
593
/**
@@ -640,7 +598,11 @@ class HooksProxy {
640
598
* @returns {any }
641
599
*/
642
600
makeSyncRequest ( method , transferList , ...args ) {
643
- const fromHooksThread = this . #postMessageToWorker( method , 'Sync' , transferList , args ) ;
601
+ this . waitForWorker ( ) ;
602
+
603
+ // Pass work to the worker.
604
+ debug ( 'post sync message to worker' , { method, args, transferList } ) ;
605
+ this . #worker. postMessage ( { __proto__ : null , method, args } , transferList ) ;
644
606
645
607
let response ;
646
608
do {
@@ -649,17 +611,14 @@ class HooksProxy {
649
611
AtomicsWait ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION , this . #workerNotificationLastId) ;
650
612
this . #workerNotificationLastId = AtomicsLoad ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION ) ;
651
613
652
- response = receiveMessageOnPort ( fromHooksThread ) ;
614
+ response = this . #worker . receiveMessageSync ( ) ;
653
615
} while ( response == null ) ;
654
616
debug ( 'got sync response from worker' , { method, args } ) ;
655
617
if ( response . message . status === 'never-settle' ) {
656
618
process . exit ( kUnsettledTopLevelAwait ) ;
657
619
} else if ( response . message . status === 'exit' ) {
658
620
process . exit ( response . message . body ) ;
659
621
}
660
-
661
- fromHooksThread . close ( ) ;
662
-
663
622
return this . #unwrapMessage( response ) ;
664
623
}
665
624
0 commit comments