Skip to content

Commit

Permalink
Improve type definitions and update to PHPStan level max
Browse files Browse the repository at this point in the history
  • Loading branch information
clue committed May 22, 2024
1 parent 9234da0 commit 5047943
Show file tree
Hide file tree
Showing 24 changed files with 511 additions and 263 deletions.
36 changes: 18 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ from this source stream.
The event receives a single mixed argument for incoming data.

```php
$stream->on('data', function ($data) {
$stream->on('data', function (mixed $data): void {
echo $data;
});
```
Expand Down Expand Up @@ -142,7 +142,7 @@ The `end` event will be emitted once the source stream has successfully
reached the end of the stream (EOF).

```php
$stream->on('end', function () {
$stream->on('end', function (): void {
echo 'END';
});
```
Expand Down Expand Up @@ -180,7 +180,7 @@ trying to read from this stream.
The event receives a single `Exception` argument for the error instance.

```php
$server->on('error', function (Exception $e) {
$server->on('error', function (Exception $e): void {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
```
Expand Down Expand Up @@ -213,7 +213,7 @@ stream which should result in the same error processing.
The `close` event will be emitted once the stream closes (terminates).

```php
$stream->on('close', function () {
$stream->on('close', function (): void {
echo 'CLOSED';
});
```
Expand Down Expand Up @@ -312,7 +312,7 @@ Re-attach the data source after a previous `pause()`.
```php
$stream->pause();

Loop::addTimer(1.0, function () use ($stream) {
Loop::addTimer(1.0, function () use ($stream): void {
$stream->resume();
});
```
Expand Down Expand Up @@ -362,7 +362,7 @@ you'll have to manually close the destination stream:

```php
$source->pipe($dest);
$source->on('close', function () use ($dest) {
$source->on('close', function () use ($dest): void {
$dest->end('BYE!');
});
```
Expand Down Expand Up @@ -456,7 +456,7 @@ The `drain` event will be emitted whenever the write buffer became full
previously and is now ready to accept more data.

```php
$stream->on('drain', function () use ($stream) {
$stream->on('drain', function () use ($stream): void {
echo 'Stream is now ready to accept more data';
});
```
Expand All @@ -478,11 +478,11 @@ The event receives a single `ReadableStreamInterface` argument for the
source stream.

```php
$stream->on('pipe', function (ReadableStreamInterface $source) use ($stream) {
$stream->on('pipe', function (ReadableStreamInterface $source) use ($stream): void {
echo 'Now receiving piped data';

// explicitly close target if source emits an error
$source->on('error', function () use ($stream) {
$source->on('error', function () use ($stream): void {
$stream->close();
});
});
Expand All @@ -506,7 +506,7 @@ trying to write to this stream.
The event receives a single `Exception` argument for the error instance.

```php
$stream->on('error', function (Exception $e) {
$stream->on('error', function (Exception $e): void {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
```
Expand Down Expand Up @@ -536,7 +536,7 @@ stream which should result in the same error processing.
The `close` event will be emitted once the stream closes (terminates).

```php
$stream->on('close', function () {
$stream->on('close', function (): void {
echo 'CLOSED';
});
```
Expand Down Expand Up @@ -746,7 +746,7 @@ stream in order to stop waiting for the stream to flush its final data.

```php
$stream->end();
Loop::addTimer(1.0, function () use ($stream) {
Loop::addTimer(1.0, function () use ($stream): void {
$stream->close();
});
```
Expand Down Expand Up @@ -831,10 +831,10 @@ readable mode or a stream such as `STDIN`:

```php
$stream = new ReadableResourceStream(STDIN);
$stream->on('data', function ($chunk) {
$stream->on('data', function (string $chunk): void {
echo $chunk;
});
$stream->on('end', function () {
$stream->on('end', function (): void {
echo 'END';
});
```
Expand Down Expand Up @@ -1121,7 +1121,7 @@ used to convert data, for example for transforming any structured data into
a newline-delimited JSON (NDJSON) stream like this:

```php
$through = new ThroughStream(function ($data) {
$through = new ThroughStream(function (mixed $data): string {
return json_encode($data) . PHP_EOL;
});
$through->on('data', $this->expectCallableOnceWith("[2, true]\n"));
Expand All @@ -1133,7 +1133,7 @@ The callback function is allowed to throw an `Exception`. In this case,
the stream will emit an `error` event and then [`close()`](#close-1) the stream.

```php
$through = new ThroughStream(function ($data) {
$through = new ThroughStream(function (mixed $data): string {
if (!is_string($data)) {
throw new \UnexpectedValueException('Only strings allowed');
}
Expand Down Expand Up @@ -1164,7 +1164,7 @@ $stdout = new WritableResourceStream(STDOUT);

$stdio = new CompositeStream($stdin, $stdout);

$stdio->on('data', function ($chunk) use ($stdio) {
$stdio->on('data', function (string $chunk) use ($stdio): void {
$stdio->write('You said: ' . $chunk);
});
```
Expand Down Expand Up @@ -1243,7 +1243,7 @@ If you do not want to run these, they can simply be skipped like this:
vendor/bin/phpunit --exclude-group internet
```

On top of this, we use PHPStan on level 5 to ensure type safety across the project:
On top of this, we use PHPStan on max level to ensure type safety across the project:

```bash
vendor/bin/phpstan
Expand Down
4 changes: 2 additions & 2 deletions examples/01-http.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@

$stream = new DuplexResourceStream($resource);

$stream->on('data', function ($chunk) {
$stream->on('data', function (string $chunk): void {
echo $chunk;
});
$stream->on('close', function () {
$stream->on('close', function (): void {
echo '[CLOSED]' . PHP_EOL;
});

Expand Down
4 changes: 2 additions & 2 deletions examples/02-https.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@

$stream = new DuplexResourceStream($resource);

$stream->on('data', function ($chunk) {
$stream->on('data', function (string $chunk): void {
echo $chunk;
});
$stream->on('close', function () {
$stream->on('close', function (): void {
echo '[CLOSED]' . PHP_EOL;
});

Expand Down
12 changes: 9 additions & 3 deletions examples/91-benchmark-throughput.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@

$args = getopt('i:o:t:');
$if = $args['i'] ?? '/dev/zero';
assert(is_string($if));
$of = $args['o'] ?? '/dev/null';
assert(is_string($of));
$t = $args['t'] ?? 1;
assert(is_numeric($t));

// passing file descriptors requires mapping paths (https://bugs.php.net/bug.php?id=53465)
$if = str_replace('/dev/fd/', 'php://fd/', $if);
Expand All @@ -38,18 +41,21 @@

// setup input and output streams and pipe inbetween
$fh = fopen($if, 'r');
assert(is_resource($fh));
$fo = fopen($of, 'w');
assert(is_resource($fo));
$in = new React\Stream\ReadableResourceStream($fh);
$out = new React\Stream\WritableResourceStream(fopen($of, 'w'));
$out = new React\Stream\WritableResourceStream($fo);
$in->pipe($out);

// stop input stream in $t seconds
$start = microtime(true);
$timeout = Loop::addTimer($t, function () use ($in) {
$timeout = Loop::addTimer((float) $t, function () use ($in): void {
$in->close();
});

// print stream position once stream closes
$in->on('close', function () use ($fh, $start, $timeout, $info) {
$in->on('close', function () use ($fh, $start, $timeout, $info): void {
$t = microtime(true) - $start;
Loop::cancelTimer($timeout);

Expand Down
2 changes: 1 addition & 1 deletion phpstan.neon.dist
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
parameters:
level: 5
level: max

paths:
- examples/
Expand Down
5 changes: 5 additions & 0 deletions src/CompositeStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@

final class CompositeStream extends EventEmitter implements DuplexStreamInterface
{
/** @var ReadableStreamInterface */
private $readable;

/** @var WritableStreamInterface */
private $writable;

/** @var bool */
private $closed = false;

public function __construct(ReadableStreamInterface $readable, WritableStreamInterface $writable)
Expand Down
23 changes: 18 additions & 5 deletions src/DuplexResourceStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

final class DuplexResourceStream extends EventEmitter implements DuplexStreamInterface
{
/** @var resource */
private $stream;

/** @var LoopInterface */
Expand All @@ -31,11 +32,20 @@ final class DuplexResourceStream extends EventEmitter implements DuplexStreamInt
* @var int
*/
private $bufferSize;

/** @var WritableStreamInterface */
private $buffer;

/** @var bool */
private $readable = true;

/** @var bool */
private $writable = true;

/** @var bool */
private $closing = false;

/** @var bool */
private $listening = false;

/**
Expand Down Expand Up @@ -78,13 +88,13 @@ public function __construct($stream, ?LoopInterface $loop = null, ?int $readChun
$this->bufferSize = $readChunkSize ?? 65536;
$this->buffer = $buffer;

$this->buffer->on('error', function ($error) {
$this->buffer->on('error', function (\Exception $error): void {
$this->emit('error', [$error]);
});

$this->buffer->on('close', [$this, 'close']);

$this->buffer->on('drain', function () {
$this->buffer->on('drain', function (): void {
$this->emit('drain');
});

Expand Down Expand Up @@ -167,11 +177,14 @@ public function pipe(WritableStreamInterface $dest, array $options = []): Writab
return Util::pipe($this, $dest, $options);
}

/** @internal */
public function handleData($stream)
/**
* @internal
* @param resource $stream
*/
public function handleData($stream): void
{
$error = null;
\set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error): bool {
\set_error_handler(function (int $errno, string $errstr, string $errfile, int $errline) use (&$error): bool {
$error = new \ErrorException(
$errstr,
0,
Expand Down
7 changes: 5 additions & 2 deletions src/ReadableResourceStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ final class ReadableResourceStream extends EventEmitter implements ReadableStrea
*/
private $bufferSize;

/** @var bool */
private $closed = false;

/** @var bool */
private $listening = false;

/**
Expand Down Expand Up @@ -121,10 +124,10 @@ public function close(): void
}

/** @internal */
public function handleData()
public function handleData(): void
{
$error = null;
\set_error_handler(function ($errno, $errstr, $errfile, $errline) use (&$error): bool {
\set_error_handler(function (int $errno, string $errstr, string $errfile, int $errline) use (&$error): bool {
$error = new \ErrorException(
$errstr,
0,
Expand Down
14 changes: 7 additions & 7 deletions src/ReadableStreamInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* The event receives a single mixed argument for incoming data.
*
* ```php
* $stream->on('data', function ($data) {
* $stream->on('data', function (mixed $data): void {
* echo $data;
* });
* ```
Expand Down Expand Up @@ -47,7 +47,7 @@
* reached the end of the stream (EOF).
*
* ```php
* $stream->on('end', function () {
* $stream->on('end', function (): void {
* echo 'END';
* });
* ```
Expand Down Expand Up @@ -84,7 +84,7 @@
* The event receives a single `Exception` argument for the error instance.
*
* ```php
* $stream->on('error', function (Exception $e) {
* $stream->on('error', function (Exception $e): void {
* echo 'Error: ' . $e->getMessage() . PHP_EOL;
* });
* ```
Expand Down Expand Up @@ -116,7 +116,7 @@
* The `close` event will be emitted once the stream closes (terminates).
*
* ```php
* $stream->on('close', function () {
* $stream->on('close', function (): void {
* echo 'CLOSED';
* });
* ```
Expand Down Expand Up @@ -236,7 +236,7 @@ public function pause(): void;
* ```php
* $stream->pause();
*
* Loop::addTimer(1.0, function () use ($stream) {
* Loop::addTimer(1.0, function () use ($stream): void {
* $stream->resume();
* });
* ```
Expand Down Expand Up @@ -287,7 +287,7 @@ public function resume(): void;
*
* ```php
* $source->pipe($dest);
* $source->on('close', function () use ($dest) {
* $source->on('close', function () use ($dest): void {
* $dest->end('BYE!');
* });
* ```
Expand Down Expand Up @@ -319,7 +319,7 @@ public function resume(): void;
* a `pipe` event with this source stream an event argument.
*
* @param WritableStreamInterface $dest
* @param array $options
* @param array{end?:bool} $options
* @return WritableStreamInterface $dest stream as-is
*/
public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface;
Expand Down
Loading

0 comments on commit 5047943

Please sign in to comment.