@@ -118,6 +118,7 @@ const kHasFlowing = 1 << 23;
118118const kFlowing = 1 << 24 ;
119119const kHasPaused = 1 << 25 ;
120120const kPaused = 1 << 26 ;
121+ const kDataListening = 1 << 27 ;
121122
122123// TODO(benjamingr) it is likely slower to do it this way than with free functions
123124function makeBitMapDescriptor ( bit ) {
@@ -527,8 +528,7 @@ function canPushMore(state) {
527528}
528529
529530function addChunk ( stream , state , chunk , addToFront ) {
530- if ( ( state [ kState ] & ( kFlowing | kSync ) ) === kFlowing && state . length === 0 &&
531- stream . listenerCount ( 'data' ) > 0 ) {
531+ if ( ( state [ kState ] & ( kFlowing | kSync | kDataListening ) ) === ( kFlowing | kDataListening ) && state . length === 0 ) {
532532 // Use the guard to avoid creating `Set()` repeatedly
533533 // when we have multiple pipes.
534534 if ( ( state [ kState ] & kMultiAwaitDrain ) !== 0 ) {
@@ -1062,7 +1062,7 @@ function pipeOnDrain(src, dest) {
10621062 }
10631063
10641064 if ( ( ! state . awaitDrainWriters || state . awaitDrainWriters . size === 0 ) &&
1065- src . listenerCount ( 'data' ) ) {
1065+ ( state [ kState ] & kDataListening ) !== 0 ) {
10661066 src . resume ( ) ;
10671067 }
10681068 } ;
@@ -1109,6 +1109,8 @@ Readable.prototype.on = function(ev, fn) {
11091109 const state = this . _readableState ;
11101110
11111111 if ( ev === 'data' ) {
1112+ state [ kState ] |= kDataListening ;
1113+
11121114 // Update readableListening so that resume() may be a no-op
11131115 // a few lines down. This is needed to support once('readable').
11141116 state [ kState ] |= this . listenerCount ( 'readable' ) > 0 ? kReadableListening : 0 ;
@@ -1135,6 +1137,8 @@ Readable.prototype.on = function(ev, fn) {
11351137Readable . prototype . addListener = Readable . prototype . on ;
11361138
11371139Readable . prototype . removeListener = function ( ev , fn ) {
1140+ const state = this . _readableState ;
1141+
11381142 const res = Stream . prototype . removeListener . call ( this ,
11391143 ev , fn ) ;
11401144
@@ -1146,6 +1150,8 @@ Readable.prototype.removeListener = function(ev, fn) {
11461150 // resume within the same tick will have no
11471151 // effect.
11481152 process . nextTick ( updateReadableListening , this ) ;
1153+ } else if ( ev === 'data' && this . listenerCount ( 'data' ) === 0 ) {
1154+ state [ kState ] &= ~ kDataListening ;
11491155 }
11501156
11511157 return res ;
@@ -1184,7 +1190,7 @@ function updateReadableListening(self) {
11841190 state [ kState ] |= kHasFlowing | kFlowing ;
11851191
11861192 // Crude way to check if we should resume.
1187- } else if ( self . listenerCount ( 'data' ) > 0 ) {
1193+ } else if ( ( state [ kState ] & kDataListening ) !== 0 ) {
11881194 self . resume ( ) ;
11891195 } else if ( ( state [ kState ] & kReadableListening ) === 0 ) {
11901196 state [ kState ] &= ~ ( kHasFlowing | kFlowing ) ;
0 commit comments