@@ -713,35 +713,39 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
713713 ondrain ( ) ;
714714 }
715715
716+ function pause ( ) {
717+ // If the user unpiped during `dest.write()`, it is possible
718+ // to get stuck in a permanently paused state if that write
719+ // also returned false.
720+ // => Check whether `dest` is still a piping destination.
721+ if ( ! cleanedUp ) {
722+ if ( state . pipes . length === 1 && state . pipes [ 0 ] === dest ) {
723+ debug ( 'false write response, pause' , 0 ) ;
724+ state . awaitDrainWriters = dest ;
725+ state . multiAwaitDrain = false ;
726+ } else if ( state . pipes . length > 1 && state . pipes . includes ( dest ) ) {
727+ debug ( 'false write response, pause' , state . awaitDrainWriters . size ) ;
728+ state . awaitDrainWriters . add ( dest ) ;
729+ }
730+ src . pause ( ) ;
731+ }
732+ if ( ! ondrain ) {
733+ // When the dest drains, it reduces the awaitDrain counter
734+ // on the source. This would be more elegant with a .once()
735+ // handler in flow(), but adding and removing repeatedly is
736+ // too slow.
737+ ondrain = pipeOnDrain ( src , dest ) ;
738+ dest . on ( 'drain' , ondrain ) ;
739+ }
740+ }
741+
716742 src . on ( 'data' , ondata ) ;
717743 function ondata ( chunk ) {
718744 debug ( 'ondata' ) ;
719745 const ret = dest . write ( chunk ) ;
720746 debug ( 'dest.write' , ret ) ;
721747 if ( ret === false ) {
722- // If the user unpiped during `dest.write()`, it is possible
723- // to get stuck in a permanently paused state if that write
724- // also returned false.
725- // => Check whether `dest` is still a piping destination.
726- if ( ! cleanedUp ) {
727- if ( state . pipes . length === 1 && state . pipes [ 0 ] === dest ) {
728- debug ( 'false write response, pause' , 0 ) ;
729- state . awaitDrainWriters = dest ;
730- state . multiAwaitDrain = false ;
731- } else if ( state . pipes . length > 1 && state . pipes . includes ( dest ) ) {
732- debug ( 'false write response, pause' , state . awaitDrainWriters . size ) ;
733- state . awaitDrainWriters . add ( dest ) ;
734- }
735- src . pause ( ) ;
736- }
737- if ( ! ondrain ) {
738- // When the dest drains, it reduces the awaitDrain counter
739- // on the source. This would be more elegant with a .once()
740- // handler in flow(), but adding and removing repeatedly is
741- // too slow.
742- ondrain = pipeOnDrain ( src , dest ) ;
743- dest . on ( 'drain' , ondrain ) ;
744- }
748+ pause ( ) ;
745749 }
746750 }
747751
@@ -790,7 +794,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
790794
791795 if ( dest . writableNeedDrain === true ) {
792796 if ( state . flowing ) {
793- src . pause ( ) ;
797+ pause ( ) ;
794798 }
795799 } else if ( ! state . flowing ) {
796800 debug ( 'pipe resume' ) ;
0 commit comments