Skip to content

Commit

Permalink
Merge pull request #171 from clue-labs/drain-throughstream
Browse files Browse the repository at this point in the history
Fix `drain` event of `ThroughStream` to handle potential race condition
WyriHaximus authored Sep 25, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents 3ec71bc + 6273e35 commit 78a6ea5
Showing 2 changed files with 70 additions and 7 deletions.
19 changes: 12 additions & 7 deletions src/ThroughStream.php
Original file line number Diff line number Diff line change
@@ -93,16 +93,19 @@ public function __construct($callback = null)

public function pause()
{
$this->paused = true;
// only allow pause if still readable, false otherwise
$this->paused = $this->readable;
}

public function resume()
{
$this->paused = false;

// emit drain event if previous write was paused (throttled)
if ($this->drain) {
$this->drain = false;
$this->emit('drain');
}
$this->paused = false;
}

public function pipe(WritableStreamInterface $dest, array $options = array())
@@ -139,12 +142,13 @@ public function write($data)

$this->emit('data', array($data));

// emit drain event on next resume if currently paused (throttled)
if ($this->paused) {
$this->drain = true;
return false;
}

return true;
// continue writing if still writable and not paused (throttled), false otherwise
return $this->writable && !$this->paused;
}

public function end($data = null)
@@ -164,7 +168,7 @@ public function end($data = null)

$this->readable = false;
$this->writable = false;
$this->paused = true;
$this->paused = false;
$this->drain = false;

$this->emit('end');
@@ -179,9 +183,10 @@ public function close()

$this->readable = false;
$this->writable = false;
$this->closed = true;
$this->paused = true;
$this->paused = false;
$this->drain = false;

$this->closed = true;
$this->callback = null;

$this->emit('close');
58 changes: 58 additions & 0 deletions tests/ThroughStreamTest.php
Original file line number Diff line number Diff line change
@@ -95,6 +95,30 @@ public function itShouldReturnFalseForAnyDataWrittenToItWhenPaused()
$this->assertFalse($ret);
}

/** @test */
public function itShouldReturnFalseForAnyDataWrittenToItWhenDataEventEndsStream()
{
$through = new ThroughStream();
$through->on('data', function () use ($through) {
$through->end();
});
$ret = $through->write('foo');

$this->assertFalse($ret);
}

/** @test */
public function itShouldReturnFalseForAnyDataWrittenToItWhenDataEventClosesStream()
{
$through = new ThroughStream();
$through->on('data', function () use ($through) {
$through->close();
});
$ret = $through->write('foo');

$this->assertFalse($ret);
}

/** @test */
public function itShouldEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenToItWhenPaused()
{
@@ -106,6 +130,40 @@ public function itShouldEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenToItWh
$through->resume();
}

/** @test */
public function itShouldNotEmitDrainOnResumeAfterClose()
{
$through = new ThroughStream();
$through->close();

$through->on('drain', $this->expectCallableNever());
$through->resume();
}

/** @test */
public function itShouldNotEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenThatCausesStreamToClose()
{
$through = new ThroughStream();
$through->on('data', function () use ($through) { $through->close(); });
$through->write('foo');

$through->on('drain', $this->expectCallableNever());
$through->resume();
}

/** @test */
public function itShouldReturnFalseForAnyDataWrittenToItAfterPausingFromDrainEvent()
{
$through = new ThroughStream();
$through->pause();
$through->write('foo');

$through->on('drain', function () use ($through) { $through->pause(); });
$through->resume();

$this->assertFalse($through->write('bar'));
}

/** @test */
public function itShouldReturnTrueForAnyDataWrittenToItWhenResumedAfterPause()
{

0 comments on commit 78a6ea5

Please sign in to comment.