Skip to content

Commit

Permalink
Merge pull request #30 from yadaiio/Improve_documentation_and_examples
Browse files Browse the repository at this point in the history
Improve documentation and examples
  • Loading branch information
clue authored Apr 4, 2024
2 parents 772af44 + 8b9aca0 commit 577959c
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 29 deletions.
26 changes: 20 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ operations, but keeping thousands of jobs in memory at once may easily take up
all resources on your side.
Instead, you can use this library to stream your arbitrarily large input list
as individual records to a non-blocking (async) transformation handler. It uses
[ReactPHP](https://reactphp.org) to enable you to concurrently process multiple
[ReactPHP](https://reactphp.org/) to enable you to concurrently process multiple
records at once. You can control the concurrency limit, so that by allowing
it to process 10 operations at the same time, you can thus process this large
input list around 10 times faster and at the same time you're no longer limited
Expand Down Expand Up @@ -72,21 +72,25 @@ Once [installed](#install), you can use the following code to process an example
user lists by sending a (RESTful) HTTP API request for each user record:

```php
<?php

require __DIR__ . '/vendor/autoload.php';

$browser = new React\Http\Browser();

$concurrency = isset($argv[1]) ? $argv[1] : 3;

// each job should use the browser to GET a certain URL
// limit number of concurrent jobs here
$transformer = new Transformer($concurrency, function ($user) use ($browser) {
$transformer = new Clue\React\Flux\Transformer($concurrency, function ($user) use ($browser) {
// skip users that do not have an IP address listed
if (!isset($user['ip'])) {
return React\Promise\resolve($user);
}

// look up country for this IP
return $browser->get("https://ipapi.co/$user[ip]/country_name/")->then(
function (ResponseInterface $response) use ($user) {
function (Psr\Http\Message\ResponseInterface $response) use ($user) {
// response successfully received
// add country to user array and return updated user
$user['country'] = (string)$response->getBody();
Expand Down Expand Up @@ -114,7 +118,9 @@ $transformer->on('data', function ($user) {
$transformer->on('end', function () {
echo '[DONE]' . PHP_EOL;
});
$transformer->on('error', 'printf');
$transformer->on('error', function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

```

Expand Down Expand Up @@ -240,8 +246,8 @@ $transformer = new Transformer(10, function ($url) use ($browser) {

return json_decode($response->getBody());
},
function (Exception $error) {
var_dump('There was an error', $error->getMessage());
function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;

throw $error;
}
Expand Down Expand Up @@ -411,6 +417,10 @@ $transformer = new Transformer(10, function ($data) use ($http) {
});

$source->pipe($gunzip)->pipe($ndjson)->pipe($transformer)->pipe($dest);

$transformer->on('error', function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
```

Keep in mind that the transformation handler may return a rejected promise.
Expand Down Expand Up @@ -456,6 +466,8 @@ $promise = Transformer::all($input, 3, function ($data) use ($browser, $url) {

$promise->then(function ($count) {
echo 'All ' . $count . ' jobs successful!' . PHP_EOL;
}, function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
```

Expand Down Expand Up @@ -561,6 +573,8 @@ $promise = Transformer::any($input, 3, function ($data) use ($browser, $url) {

$promise->then(function (ResponseInterface $response) {
echo 'First successful job: ' . $response->getBody() . PHP_EOL;
}, function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
```

Expand Down
12 changes: 5 additions & 7 deletions examples/01-transform.php
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
<?php

use Clue\React\Flux\Transformer;
use Psr\Http\Message\ResponseInterface;

require __DIR__ . '/../vendor/autoload.php';

$browser = new React\Http\Browser();
Expand All @@ -11,7 +8,7 @@

// each job should use the browser to GET a certain URL
// limit number of concurrent jobs here
$transformer = new Transformer($concurrency, function ($user) use ($browser) {
$transformer = new Clue\React\Flux\Transformer($concurrency, function ($user) use ($browser) {
// skip users that do not have an IP address listed
if (!isset($user['ip'])) {
$user['country'] = 'n/a';
Expand All @@ -21,7 +18,7 @@

// look up country for this user's IP
return $browser->get("https://ipapi.co/$user[ip]/country_name/")->then(
function (ResponseInterface $response) use ($user) {
function (Psr\Http\Message\ResponseInterface $response) use ($user) {
// response successfully received
// add country to user array and return updated user
$user['country'] = (string)$response->getBody();
Expand Down Expand Up @@ -49,5 +46,6 @@ function (ResponseInterface $response) use ($user) {
$transformer->on('end', function () {
echo '[DONE]' . PHP_EOL;
});
$transformer->on('error', 'printf');

$transformer->on('error', function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
10 changes: 3 additions & 7 deletions examples/02-transform-all.php
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
<?php

use Clue\React\Flux\Transformer;
use Psr\Http\Message\ResponseInterface;

require __DIR__ . '/../vendor/autoload.php';

$browser = new React\Http\Browser();
Expand All @@ -21,12 +18,12 @@
// each job should use the browser to POST each user object to a certain URL
// process all users by processing all users through transformer
// limit number of concurrent jobs here
$promise = Transformer::all($input, $concurrency, function ($user) use ($browser, $url) {
$promise = Clue\React\Flux\Transformer::all($input, $concurrency, function ($user) use ($browser, $url) {
return $browser->post(
$url,
array('Content-Type' => 'application/json'),
json_encode($user)
)->then(function (ResponseInterface $response) {
)->then(function (Psr\Http\Message\ResponseInterface $response) {
// demo HTTP response validation
$body = json_decode($response->getBody());
if (!isset($body->json)) {
Expand All @@ -40,10 +37,9 @@ function ($count) {
echo 'Successfully processed all ' . $count . ' user records' . PHP_EOL;
},
function (Exception $e) {
echo 'An error occurred: ' . $e->getMessage() . PHP_EOL;
echo 'Error: ' . $e->getMessage() . PHP_EOL;
if ($e->getPrevious()) {
echo 'Previous: ' . $e->getPrevious()->getMessage() . PHP_EOL;
}
}
);

10 changes: 3 additions & 7 deletions examples/03-transform-any.php
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
<?php

use Clue\React\Flux\Transformer;
use Psr\Http\Message\ResponseInterface;

require __DIR__ . '/../vendor/autoload.php';

$browser = new React\Http\Browser();
Expand All @@ -21,12 +18,12 @@
// each job should use the browser to POST each user object to a certain URL
// process all users by processing all users through transformer
// limit number of concurrent jobs here
$promise = Transformer::any($input, $concurrency, function ($user) use ($browser, $url) {
$promise = Clue\React\Flux\Transformer::any($input, $concurrency, function ($user) use ($browser, $url) {
return $browser->post(
$url,
array('Content-Type' => 'application/json'),
json_encode($user)
)->then(function (ResponseInterface $response) use ($user) {
)->then(function (Psr\Http\Message\ResponseInterface $response) use ($user) {
// demo HTTP response validation
$body = json_decode($response->getBody());
if (!isset($body->json)) {
Expand All @@ -44,10 +41,9 @@ function ($user) {
echo 'Successfully processed user record:' . print_r($user, true) . PHP_EOL;
},
function (Exception $e) {
echo 'An error occurred: ' . $e->getMessage() . PHP_EOL;
echo 'Error: ' . $e->getMessage() . PHP_EOL;
if ($e->getPrevious()) {
echo 'Previous: ' . $e->getPrevious()->getMessage() . PHP_EOL;
}
}
);

12 changes: 10 additions & 2 deletions src/Transformer.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@
*
* return json_decode($response->getBody());
* },
* function (Exception $error) {
* var_dump('There was an error', $error->getMessage());
* function (Exception $e) {
* echo 'Error: ' . $e->getMessage() . PHP_EOL;
*
* throw $error;
* }
Expand Down Expand Up @@ -256,6 +256,10 @@
* });
*
* $source->pipe($gunzip)->pipe($ndjson)->pipe($transformer)->pipe($dest);
*
* $transformer->on('error', function (Exception $e) {
* echo 'Error: ' . $e->getMessage() . PHP_EOL;
* });
* ```
*
* Keep in mind that the transformation handler may return a rejected promise.
Expand Down Expand Up @@ -314,6 +318,8 @@ final class Transformer extends EventEmitter implements DuplexStreamInterface
*
* $promise->then(function ($count) {
* echo 'All ' . $count . ' jobs successful!' . PHP_EOL;
* }, function (Exception $e) {
* echo 'Error: ' . $e->getMessage() . PHP_EOL;
* });
* ```
*
Expand Down Expand Up @@ -466,6 +472,8 @@ public static function all(ReadableStreamInterface $input, $concurrency, $callba
*
* $promise->then(function (ResponseInterface $response) {
* echo 'First successful job: ' . $response->getBody() . PHP_EOL;
* }, function (Exception $e) {
* echo 'Error: ' . $e->getMessage() . PHP_EOL;
* });
* ```
*
Expand Down

0 comments on commit 577959c

Please sign in to comment.