Skip to content

Commit

Permalink
Merge branch '0.9' of github.com:chevere/workflow into 0.9
Browse files Browse the repository at this point in the history
  • Loading branch information
rodber committed May 20, 2024
2 parents f5c6956 + f761d94 commit 2024cd1
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 23 deletions.
71 changes: 64 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -467,13 +467,6 @@ $run = run(
image: '/path/to/image-to-upload.png',
savePath: '/path/to/storage/'
);

// Alternative syntax
$variables = [
'image' => '/path/to/image-to-upload.png',
'savePath' => '/path/to/storage/'
];
$run = run($workflow, ...$variables);
```

Use `getReturn` to retrieve a job response as a `CastArgument` object which can be used to get a typed response.
Expand All @@ -482,6 +475,70 @@ Use `getReturn` to retrieve a job response as a `CastArgument` object which can
$thumbFile = $run->getReturn('thumb')->string();
```

### Sync vs Async

Run live example: `php demo/sync-vs-async.php` - [view source](./demo/sync-vs-async.php)

For this example you can compare the execution time between synchronous and asynchronous jobs. The example fetches the content of three URLs using `FetchUrl` action.

```php
use Chevere\Demo\Actions\FetchUrl;
use function Chevere\Workflow\async;
use function Chevere\Workflow\run;
use function Chevere\Workflow\sync;
use function Chevere\Workflow\variable;
use function Chevere\Workflow\workflow;

$sync = workflow(
php: sync(
new FetchUrl(),
url: variable('php'),
),
github: sync(
new FetchUrl(),
url: variable('github'),
),
chevere: sync(
new FetchUrl(),
url: variable('chevere'),
),
);
$async = workflow(
php: async(
new FetchUrl(),
url: variable('php'),
),
github: async(
new FetchUrl(),
url: variable('github'),
),
chevere: async(
new FetchUrl(),
url: variable('chevere'),
),
);
$variables = [
'php' => 'https://www.php.net',
'github' => 'https://github.com/chevere/workflow',
'chevere' => 'https://chevere.org',
];
$time = microtime(true);
$run = run($sync, ...$variables);
$time = microtime(true) - $time;
echo "Time sync: {$time}\n";
$time = microtime(true);
$run = run($async, ...$variables);
$time = microtime(true) - $time;
echo "Time async: {$time}\n";
```

When running sync (blocking) jobs the execution time is higher than async (non-blocking) jobs. This is because async jobs run in parallel.

```plain
Time sync: 2.5507028102875
Time async: 1.5810508728027
```

### Conditional jobs

Run live example: `php demo/run-if.php` - [view source](./demo/run-if.php)
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
],
"require": {
"php": "^8.1",
"amphp/parallel": "^1.4",
"amphp/parallel": "^2.2",
"chevere/action": "^1.0.0",
"chevere/data-structure": "^1.0.1",
"chevere/parameter": "^1.0.x-dev",
Expand Down
30 changes: 30 additions & 0 deletions demo/Actions/FetchUrl.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

/*
* This file is part of Chevere.
*
* (c) Rodolfo Berrios <rodolfo@chevere.org>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Chevere\Demo\Actions;

use Chevere\Action\Action;
use RuntimeException;

class FetchUrl extends Action
{
protected function main(string $url): string
{
$content = file_get_contents($url);
if ($content === false) {
throw new RuntimeException('Error fetching URL');
}

return $content;
}
}
2 changes: 1 addition & 1 deletion demo/image-resize.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
$graph = $run->workflow()->jobs()->graph()->toArray();
echo "Workflow graph:\n";
foreach ($graph as $level => $jobs) {
echo " {$level}: [" . implode('|', $jobs) . "]\n";
echo " {$level}: " . implode('|', $jobs) . "\n";
}
echo <<<PLAIN
thumbFile: {$run->getReturn('thumb')->string()}
Expand Down
66 changes: 66 additions & 0 deletions demo/sync-vs-async.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

/*
* This file is part of Chevere.
*
* (c) Rodolfo Berrios <rodolfo@chevere.org>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

use Chevere\Demo\Actions\FetchUrl;
use function Chevere\Workflow\async;
use function Chevere\Workflow\run;
use function Chevere\Workflow\sync;
use function Chevere\Workflow\variable;
use function Chevere\Workflow\workflow;

require 'loader.php';

/*
php demo/sync-vs-async.php
*/
$sync = workflow(
php: sync(
new FetchUrl(),
url: variable('php'),
),
github: sync(
new FetchUrl(),
url: variable('github'),
),
chevere: sync(
new FetchUrl(),
url: variable('chevere'),
),
);
$async = workflow(
php: async(
new FetchUrl(),
url: variable('php'),
),
github: async(
new FetchUrl(),
url: variable('github'),
),
chevere: async(
new FetchUrl(),
url: variable('chevere'),
),
);
$variables = [
'php' => 'https://www.php.net',
'github' => 'https://github.com/chevere/workflow',
'chevere' => 'https://chevere.org',
];
$time = microtime(true);
$run = run($sync, ...$variables);
$time = microtime(true) - $time;
echo "Time sync: {$time}\n";
$time = microtime(true);
$run = run($async, ...$variables);
$time = microtime(true) - $time;
echo "Time async: {$time}\n";
45 changes: 45 additions & 0 deletions src/CallableTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

/*
* This file is part of Chevere.
*
* (c) Rodolfo Berrios <rodolfo@chevere.org>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Chevere\Workflow;

use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;

/**
* @template-implements Task<mixed, never, never>
*/
final class CallableTask implements Task
{
private string $callable;

/**
* @var array<mixed>
*/
private array $arguments;

public function __construct(
string $callable,
mixed ...$arguments
) {
$this->callable = $callable;
$this->arguments = $arguments;
}

public function run(Channel $channel, Cancellation $cancellation): mixed
{
// @phpstan-ignore-next-line
return ($this->callable)(...$this->arguments);
}
}
32 changes: 18 additions & 14 deletions src/Runner.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace Chevere\Workflow;

use Amp\Promise;
use Amp\Parallel\Worker\Execution;
use Chevere\Action\Interfaces\ActionInterface;
use Chevere\Parameter\Interfaces\CastInterface;
use Chevere\Workflow\Interfaces\JobInterface;
Expand All @@ -24,9 +24,8 @@
use InvalidArgumentException;
use OutOfBoundsException;
use Throwable;
use function Amp\Parallel\Worker\enqueueCallable;
use function Amp\Promise\all;
use function Amp\Promise\wait;
use function Amp\Future\await;
use function Amp\Parallel\Worker\submit;
use function Chevere\Message\message;
use function Chevere\Parameter\cast;

Expand Down Expand Up @@ -54,9 +53,12 @@ public function withRun(): RunnerInterface

continue;
}
$promises = $new->getPromises($node);
$executions = $new->getExecutions($node);
/** @var RunnerInterface[] $responses */
$responses = wait(all($promises));
$responses = await(array_map(
fn (Execution $e) => $e->getFuture(),
$executions,
));
foreach ($responses as $runner) {
$new->merge($new, $runner);
}
Expand Down Expand Up @@ -178,20 +180,22 @@ private function addJobSkip(string $name): void

/**
* @param array<string> $queue
* @return array<Promise<mixed>>
* @return array<Execution<mixed, never, never>>
*/
private function getPromises(array $queue): array
private function getExecutions(array $queue): array
{
$promises = [];
$return = [];
foreach ($queue as $job) {
$promises[] = enqueueCallable(
'Chevere\\Workflow\\runnerForJob',
$this,
$job,
$return[] = submit(
new CallableTask(
'Chevere\\Workflow\\runnerForJob',
$this,
$job,
)
);
}

return $promises;
return $return;
}

private function merge(self $self, RunnerInterface $runner): void
Expand Down

0 comments on commit 2024cd1

Please sign in to comment.