The Async
component brings concurrency into PHP using cooperative multitasking.
Note
The Async component is built on top of RevoltPHP, which makes it compatible with Amphp, and other libraries that use the same event loop.
use Psl\IO;
use Psl\Async;
use Psl\Shell;
Async\main(static function(): int {
$watcher = Async\Scheduler::onSignal(SIGINT, function (): never {
IO\write_error_line('SIGINT received, stopping...');
exit(0);
});
Async\Scheduler::unreference($watcher);
IO\write_error_line('Press Ctrl+C to stop');
Async\concurrently([
static fn(): string => Shell\execute('sleep', ['3']),
static fn(): string => Shell\execute('echo', ['Hello World!']),
static fn(): string => Shell\execute('echo', ['Hello World!']),
]);
IO\write_error_line('Done!');
return 0;
});
-
main((Closure(): int)|(Closure(): Awaitable<int> $closure): never
Execute
$closure
in{main}
fiber, then exit with returned exit code.If
$closure
returns anAwaitable
, it MUST resolve with an exit code.After executing
$closure
, the event loop will keep running until there's no more callbacks to be executed.
use Psl\Async; use Psl\IO; Async\main(static function(): int { Async\Scheduler::delay(1.0, static function(): void { IO\write_line('hello'); }); return 0; }); // Output: // hello
-
run<T>((Closure(): T) $closure): Async\Awaitable<T>
Create a new fiber asynchronously using the given closure, and return an
Awaitable
that resolves to the result of the closure.If the closure throws an exception, the
Awaitable
will fail with that exception.
use Psl\Async; $awaitable = Async\run(static function (): string { Async\sleep(1); return 'Hello world!'; }); $result = $awaitable->await(); // 'Hello world!'
use Psl\Async; $awaitable = Async\run(static function (): string { throw new Exception('Something went wrong!'); }); try { $result = $awaitable->await(); // throws Exception } catch (Exception $e) { // ... handle exception }
-
await<T>(Awaitable<T> $awaitable): T
Await the given
Awaitable
, and return the result.If the
Awaitable
fails, the exception will be thrown.
use Psl\Async; $awaitable = Async\run(static function (): string { return 'Hello world!'; }); $result = Async\await($awaitable); // 'Hello world!'
-
all<Tk of array-key, Tv>(iterable<Tk, Awaitable<Tv>> $awaitables): array<Tk, Tv>
Awaits all
Awaitable
s to complete concurrently.If one
Awaitable
fails, the exception will be thrown immediately, and the result of theAwaitable
s will be ignored.Once the
Awaitable
s have completed, an array containing the results will be returned preserving the originalAwaitable
s order.If multiple
Awaitable
s failed at once,Exception\CompositeException
will be thrown.
use Psl\Async; use Psl\Shell; Async\all([ Async\run(static fn() => Shell\execute('php', ['vendor/bin/phpunit', '-c', 'phpunit.xml.dist'])), Async\run(static fn() => Shell\execute('php', ['vendor/bin/psalm', '-c', 'psalm.xml'])), Async\run(static fn() => Shell\execute('php', ['vendor/bin/psalm', '-c', 'psalm.xml', '--taint-analysis'])), Async\run(static fn() => Shell\execute('php', ['vendor/bin/php-cs-fixer', 'fix', '--config=.php_cs.dist.php', '--dry-run'])), Async\run(static fn() => Shell\execute('php', ['vendor/bin/phpcs', '--standard=.phpcs.xml'])), ]); try { $result = Async\all([ Async\Awaitable::error(new Exception('Something went wrong!')), Async\Awaitable::complete('hello'), ]); } catch (Exception $e) { // ... handle the exception } try { $result = Async\all([ Async\Awaitable::error(new Exception('Something went wrong!')), Async\Awaitable::error(new Exception('Something else went wrong!')), ]); } catch (Async\Exception\CompositeException $e) { $reasons = $e->getReasons(); // [Exception, Exception] // ... handle the exceptions }
-
any<T>(iterable<Awaitable<T>> $awaitables): T
Return the first successfully completed
Awaitable
result.If you want the first
Awaitable
completed, successful or not, usefirst(...)
instead.If multiple
Awaitable
s completed successfully at once, the first one will be returned.If
$awaitables
is empty,Psl\Exception\InvariantViolationException
will be thrown.If all
Awaitable
s failed,Exception\CompositeException
will be thrown.
use Psl; use Psl\Async; $result = Async\any([ Async\Awaitable::error(new Exception('Something went wrong!')), Async\Awaitable::complete('hello'), ]); Psl\invariant($result === 'hello', 'Should be hello!'); try { $result = Async\any([]); } catch (Psl\Exception\InvariantViolationException $e) { // ... handle the exception } try { $result = Async\any([ Async\Awaitable::error(new Exception('Something went wrong!')), Async\Awaitable::error(new Exception('Something else went wrong!')), ]); } catch (Async\Exception\CompositeException $e) { $reasons = $e->getReasons(); // [Exception, Exception] // ... handle the exceptions }
-
first<T>(iterable<Awaitable<T>> $awaitables): T
Return the first completed
Awaitable
result, or throw an exception if the firstAwaitable
failed.If you want the first
Awaitable
completed without an error, useany(...)
instead.If
$awaitables
is empty,Psl\Exception\InvariantViolationException
will be thrown.If all
Awaitable
s failed,Exception\CompositeException
will be thrown.
use Psl; use Psl\Async; $result = Async\first([ Async\Awaitable::complete('hello'), Async\Awaitable::error(new Exception('Something went wrong!')), ]); Psl\invariant($result === 'hello', 'Should be hello!'); try { $result = Async\first([ Async\Awaitable::error(new Exception('Something went wrong!')), Async\Awaitable::complete('hello'), ]); } catch (Exception $e) { // ... handle the exception } try { $result = Async\first([]); } catch (Psl\Exception\InvariantViolationException $e) { // ... handle the exception } try { $result = Async\first([ Async\Awaitable::error(new Exception('Something went wrong!')), Async\Awaitable::error(new Exception('Something else went wrong!')), ]); } catch (Async\Exception\CompositeException $e) { $reasons = $e->getReasons(); // [Exception, Exception] // ... handle the exceptions }
-
series<Tk of array-key, Tv>(iterable<Tk, (Closure(): Tv)> $tasks): array<Tk, Tv>
Run the functions in the tasks' iterable in series, each one running once the previous function has completed.
If any functions in the series throws, no more functions are run, and the exception is immediately thrown.
use Psl\Async; $results = Async\series([ create_users(...), create_organizations(...), create_roles(...), create_user_organization_roles(...), ]);
-
concurrently<Tk of array-key, Tv>(iterable<Tk, (Closure(): Tv)> $tasks): array<Tk, Tv>
Run the functions in the tasks' iterable in parallel, without waiting until the previous function has completed.
If any functions in the tasks' iterable throws, no more functions are run, and the exception is immediately thrown.
Once the tasks have completed, the results are returned as an array, preserving the original keys, in the order in which the tasks were passed.
If
$tasks
is empty, an empty array will be returned.If multiple tasks throw at once,
Exception\CompositeException
will be thrown.
use Psl\Async; $results = Async\concurrently([ create_users(...), create_organizations(...), create_roles(...), create_user_organization_roles(...), ]);
Warning
concurrently(...)
is about kicking-off I/O functions concurrently, not about concurrently execution of code. If your functions do not use any timers or perform any non-blocking I/O, they will actually be executed in series.
use Psl\Async; use function file_get_contents; // the following runs in series, as `file_get_contents` is blocking. Async\concurrently([ static fn() => file_get_contents('/etc/hosts'), static fn() => file_get_contents('/etc/resolv.conf'), ]);
Note
use
Psl\Result\reflect(...)
to continue the execution of other functions when a function fails.
use Psl; use Psl\Async; use Psl\Result; use Psl\Shell; [$version, $foo] = Async\concurrently([ Result\reflect(static fn() => Shell\execute('php', ['-v'])), Result\reflect(static fn() => Shell\execute('php', ['-r', 'foo();'])), ]); Psl\invariant($version->isSucceeded(), '`$ php -v` should have succeeded.'); Psl\invariant($foo->isFailed(), '`$ php -r "foo();"` should have failed.');
-
Non-blocking sleep for the specified number of seconds.
use Psl; use Psl\Async; use function time; $time = time(); Async\concurrently([ static fn() => Async\sleep(2), static fn() => Async\sleep(2), static fn() => Async\sleep(2), ]); $duration = time() - $time; Psl\invariant(2 <= $duration < 3, 'Should sleep for 2 seconds.');
-
Reschedule the work of an async function until some other time in the future.
The common use case for this is if your async function actually has to wait for some blocking call, you can tell other callbacks in the async scheduler that they can work while this one waits for the blocking call to finish (e.g., maybe in a polling situation or something).
use Psl; use Psl\Async; $deferred = new Async\Deferred(); // defer the execution of the callback until the next tick. Async\Schedule::defer(static fn() => $deferred->complete('hello')); Psl\invariant(!$deferred->isCompleted(), 'Deferred should not be completed yet.'); Async\later(); Psl\invariant($deferred->isComplete(), 'Deferred should be complete.');
-
final class Awaitable<T> implements Promise\PromiseInterface<T>
An
Awaitable
is a promise that can be awaited.It can be used to wait for the result of an async function, or to wait for the result of a blocking call.
It can also be used to wait for the result of a
Deferred
.
use Psl\Async; $awaitable = Async\run(static fn(): string => 'hello'); $result = $awaitable->await(); // 'hello'
-
static Awaitable::complete<Tv>(Tv $result): Awaitable<Tv>
Create an
Awaitable
that completes with the given value.
use Psl\Async; $awaitable = Async\Awaitable::complete('hello'); $result = $awaitable->await(); // 'hello'
-
static Awaitable::error(Exception $exception): Awaitable<never>
Create an
Awaitable
that fails with the given$exception
.
use Psl\Async; $awaitable = Async\Awaitable::error(new Exception('Something went wrong!')); try { $awaitable->await(); } catch (Exception $e) { // handle the exception }
-
static Awaitable::iterate<Tk, Tv>(iterable<Tk, Awaitable<Tv>> $awaitables): Generator<Tk, Awaitable<Tv>, _, _>
Iterate over the given
Awaitable
s in completion order.
use Psl\IO; use Psl\Async; $handles = [ Async\run(static function() { Async\sleep(1); return 'a'; }), Async\run(static function() { return 'b'; }), Async\run(static function() { Async\sleep(0.3); return 'c'; }), Async\run(static function() { Async\sleep(0.1); return 'd'; }), ]; foreach(Async\Awaitable::iterate($handles) as $k => $awaitable) { $result = $awaitable->await(); IO\writeLine($k . ': ' . $result); } // Output: // 1: b // 3: d // 2: c // 0: a
-
Awaitable::isComplete(): bool
Returns
true
if theAwaitable
is complete.
use Psl\Async; $awaitable = Async\Awaitable::complete('hello'); Psl\invariant($awaitable->isComplete(), 'Should be complete.'); $awaitable = Async\run(static fn() => Async\sleep(2)); Psl\invariant(!$awaitable->isComplete(), 'Should not be complete.');
-
Awaitable::await(): T
Await the result of the awaitable.
use Psl\Async; $awaitable = Async\run(static function(): string { Async\sleep(2); return 'hello'; }); Psl\invariant($awaitable->await() === 'hello', 'Should be "hello".');
-
Awaitable::then<Ts>((Closure(T): Ts) $success, (Closure(Exception): Ts) $failure): Awaitable<Ts>
Attaches callbacks that are invoked when this
Awaitable
is completed.The returned
Awaitable
is resolved with the return value of the callback, or rejected with an exception thrown from the callback.
use Psl; use Psl\Async; use Psl\Str; $awaitable = Async\run(static function(): string { return 'hello'; }); $awaitable = $awaitable->then( static fn($result) => Str\format('%s world', $result), static fn($error) => Psl\invariant_violation('Should not throw.'), ); $result = $awaitable->await(); // 'hello world'
-
Awaitable::map<Ts>((Closure(T): Ts) $success): Awaitable<Ts>
Attaches a callback that is invoked if this
Awaitable
is completed successfully.The returned
Awaitable
is resolved with the return value of the callback, or rejected with an exception thrown from the callback.
use Psl\Async; use Psl\Str; $awaitable = Async\run(static function(): string { return 'hello'; }); $awaitable = $awaitable ->map(static fn($result) => Str\format('%s world', $result)); $result = $awaitable->await(); // 'hello world'
-
Awaitable::catch<Ts>((Closure(Exception): Ts) $failure): Awaitable<T|Ts>
Attaches a callback that is invoked if this
Awaitable
is completed with an error.The returned
Awaitable
is resolved with the return value of the callback, or rejected with an exception thrown from the callback.
use Psl\Async; $awaitable = Async\run(static function(): string { throw new Exception('Something went wrong!'); }); $awaitable = $awaitable ->catch(static fn($error) => $error->getMessage()); $result = $awaitable->await(); // 'Something went wrong!'
-
Awaitable::always((Closure(): void) $always): Awaitable<T>
Attaches a callback that is invoked when this
Awaitable
is completed.
use Psl\IO; use Psl\Async; $awaitable = Async\run(static function(): string { return 'hello'; }); $awaitable = $awaitable->always(static function(): void { IO\write_line('done'); }); $result = $awaitable->await(); // 'hello' // Output: // done
-
Awaitable::ignore(): this
Do not forward unhandled errors to the event loop handler.
use Psl\Async; Async\run(static function(): string { throw new Exception('Something went wrong!'); })->ignore(); // No exception thrown Async\Scheduler::run();
-
-
final class Semaphore<Tin, Tout>
Run an operation with a limit on number of ongoing asynchronous jobs.
All operations must have the same input type (
Tin
) and output type (Tout
), and be processed by the same function.Tin
may be a closure invoked by the$operation
for maximum flexibility, however this pattern is best avoided in favor of creating semaphores with a more narrow process.
use Psl\Async; use Psl\IO; $semaphore = new Async\Semaphore(2, static function(int $input): void { IO\write_error_line('> started : %d', $input); Async\sleep(1); IO\write_error_line('> finished: %d', $input); }); Async\concurrently([ fn() => $semaphore->waitFor(1), fn() => $semaphore->waitFor(2), fn() => $semaphore->waitFor(3), ]); // Output: // > started: 1 // > started: 2 // > finished: 1 // > started: 3 // > finished: 2 // > finished: 3
-
Semaphore::waitFor(Tin $input): Tout
Run the operation using the given
$input
.If the concurrency limit has been reached, this method will wait until one of the ingoing operations has completed.
use Psl\Async; $semaphore = new Async\Semaphore(2, static function(int $input): void { Async\sleep(1); return $input + 1; }); $handles = []; $handles[] = Async\run(fn() => $semaphore->waitFor(1)); $handles[] = Async\run(fn() => $semaphore->waitFor(2)); $handles[] = Async\run(fn() => $semaphore->waitFor(3)); $results = Async\all($handles); // [2, 3, 4]
-
Semaphore::cancel(Exception $exception): void
Cancel all pending operations.
Any pending operation will fail with the given exception.
Future operations will continue execution as usual.
use Psl\Async; $semaphore = new Async\Semaphore(1, static function(int $input): void { Async\sleep(1); return $input + 1; }); $one = Async\run(fn() => $semaphore->waitFor(1)); $two = Async\run(fn() => $semaphore->waitFor(2)); $semaphore->cancel(new Exception('foo')); $one->await(); // 2 $two->await(); // throws `Exception` with message `foo`
-
Semaphore::getConcurrencyLimit(): positive-int
Get the concurrency limit of this semaphore.
use Psl\Async; $semaphore = new Async\Semaphore(2, static fn(int $input): void => $input + 1); $semaphore->getConcurrencyLimit(); // 2
-
Semaphore::getPendingOperations(): int<0, max>
Get the number of pending operations.
use Psl\Async; $semaphore = new Async\Semaphore(2, static fn(int $input): void => $input + 1); $semaphore->getPendingOperations(); // 0 $one = Async\run(fn() => $semaphore->waitFor(1)); $two = Async\run(fn() => $semaphore->waitFor(2)); $three = Async\run(fn() => $semaphore->waitFor(3)); $four = Async\run(fn() => $semaphore->waitFor(4)); $semaphore->getPendingOperations(); // 2 $one->await(); $semaphore->getPendingOperations(); // 1 $two->await(); $semaphore->getPendingOperations(); // 0
-
Semaphore::hasPendingOperations(): bool
Check if there are any pending operations.
use Psl\Async; $semaphore = new Async\Semaphore(2, static fn(int $input): void => $input + 1); $semaphore->hasPendingOperations(); // false $one = Async\run(fn() => $semaphore->waitFor(1)); $two = Async\run(fn() => $semaphore->waitFor(2)); $three = Async\run(fn() => $semaphore->waitFor(3)); $four = Async\run(fn() => $semaphore->waitFor(4)); $semaphore->hasPendingOperations(); // true $one->await(); $semaphore->hasPendingOperations(); // true $two->await(); $semaphore->hasPendingOperations(); // false
-
Semaphore::getIngoingOperations(): int<0, max>
Get the number of ingoing operations.
use Psl\Async; $semaphore = new Async\Semaphore(2, static function(int $input): void { Async\sleep(1); return $input + 1; }); $semaphore->getIngoingOperations(); // 0 $one = Async\run(fn() => $semaphore->waitFor(1)); $semaphore->getIngoingOperations(); // 1 $two = Async\run(fn() => $semaphore->waitFor(2)); $semaphore->getIngoingOperations(); // 2 $one->await(); $semaphore->getIngoingOperations(); // 1 $two->await(); $semaphore->getIngoingOperations(); // 0
-
Semaphore::hasIngoingOperations(): bool
Check if there are any ingoing operations.
use Psl\Async; $semaphore = new Async\Semaphore(2, static function(int $input): void { Async\sleep(1); return $input + 1; }); $semaphore->hasIngoingOperations(); // false $one = Async\run(fn() => $semaphore->waitFor(1)); $semaphore->hasIngoingOperations(); // true $two = Async\run(fn() => $semaphore->waitFor(2)); $semaphore->hasIngoingOperations(); // true $one->await(); $semaphore->hasIngoingOperations(); // true $two->await(); $semaphore->hasIngoingOperations(); // false
-
Semaphore::waitForPending(): void
Wait until all pending operations have completed.
use Psl\Async; $semaphore = new Async\Semaphore(1, static function(int $input): void { Async\sleep(1); return $input + 1; }); $one = Async\run(fn() => $semaphore->waitFor(1)); $two = Async\run(fn() => $semaphore->waitFor(2)); $semaphore->hasPendingOperations(); // true $semaphore->waitForPending(); $semaphore->hasPendingOperations(); // false
-
-
final class KeyedSemaphore<Tk of array-key, Tin, Tout>
Run an operation with a limit on number of ongoing asynchronous jobs for a specific key.
All operations must have the same input type (
Tin
) and output type (Tout
), and be processed by the same function.Tin
may be a closure invoked by the$operation
for maximum flexibility, however this pattern is best avoided in favor of creating semaphores with a more narrow process.
use Psl\Async; use Psl\IO; $semaphore = new Async\KeyedSemaphore(2, static function(string $key, int $input): void { IO\write_error_line('> started(%s): %d', $key, $input); Async\sleep(1); IO\write_error_line('> finished(%s): %d', $key, $input); }); Async\concurrently([ fn() => $semaphore->waitFor('foo', 1), fn() => $semaphore->waitFor('foo', 2), fn() => $semaphore->waitFor('foo', 3), fn() => $semaphore->waitFor('bar', 1), fn() => $semaphore->waitFor('bar', 2), fn() => $semaphore->waitFor('bar', 3), ]); // Output: // > started(foo): 1 // > started(foo): 2 // > started(bar): 1 // > started(bar): 2 // > finished(foo): 1 // > finished(bar): 1 // > started(foo): 3 // > started(bar): 3 // > finished(foo): 2 // > finished(bar): 2 // > finished(foo): 3 // > finished(bar): 3
-
KeyedSemaphore::waitFor(Tk $key, Tin $input): Tout
Run the operation using the given
$input
.If the concurrency limit has been reached for the given
$key
, this method will wait until one of the ingoing operations has completed.
use Psl\Async; $semaphore = new Async\KeyedSemaphore(2, static function(string $key, int $input): void { Async\sleep(1); return $input + 1; }); $handles = []; $handles[] = Async\run(fn() => $semaphore->waitFor('foo', 1)); $handles[] = Async\run(fn() => $semaphore->waitFor('foo', 2)); $handles[] = Async\run(fn() => $semaphore->waitFor('foo', 3)); $handles[] = Async\run(fn() => $semaphore->waitFor('bar', 4)); $handles[] = Async\run(fn() => $semaphore->waitFor('bar', 5)); $handles[] = Async\run(fn() => $semaphore->waitFor('bar', 6)); $results = Async\all($handles); // [1, 2, 4, 5, 3, 6]
-
KeyedSemaphore::cancel(string $key, Exception $exception): void
Cancel pending operations for the given key.
Any pending operation will fail with the given exception.
Future operations will continue execution as usual.
use Psl\Async; $semaphore = new Async\KeyedSemaphore(1, static function(string $_key, int $input): void { Async\sleep(1); return $input + 1; }); $one = Async\run(fn() => $semaphore->waitFor('foo', 1)); $two = Async\run(fn() => $semaphore->waitFor('foo', 2)); $semaphore->cancel('foo', new Exception('foo')); $one->await(); // 2 $two->await(); // throws `Exception` with message `foo`
-
KeyedSemaphore::cancelAll(Exception $exception): void
Cancel all pending operations.
Any pending operation will fail with the given exception.
Future operations will continue execution as usual.
use Psl\Async; $semaphore = new Async\KeyedSemaphore(1, static function(string $_key, int $input): void { Async\sleep(1); return $input + 1; }); $one = Async\run(fn() => $semaphore->waitFor('foo', 1)); $two = Async\run(fn() => $semaphore->waitFor('bar', 2)); $semaphore->cancelAll(new Exception('foo')); $one->await(); // throws `Exception` with message `foo` $two->await(); // throws `Exception` with message `foo`
-
KeyedSemaphore::getConcurrencyLimit(): positive-int
Returns the concurrency limit for the semaphore.
use Psl\Async; $semaphore = new Async\KeyedSemaphore(10, static fn(string $_key, int $input): void => $input + 1); $semaphore->getConcurrencyLimit(); // 10
-
KeyedSemaphore::getPendingOperations(Tk $key): int<0, max>
Returns the number of pending operations for the given key.
use Psl\Async; $semaphore = new Async\KeyedSemaphore(1, static fn(string $_key, int $input): void => $input + 1); $semaphore->getPendingOperations('foo'); // 0 $one = Async\run(fn() => $semaphore->waitFor('foo', 1)); $two = Async\run(fn() => $semaphore->waitFor('foo', 2)); $three = Async\run(fn() => $semaphore->waitFor('foo', 3)); $semaphore->getPendingOperations('foo'); // 2 $semaphore->getPendingOperations('bar'); // 0 $one->await(); // 2 $semaphore->getPendingOperations('foo'); // 1 $two->await(); // 3 $semaphore->getPendingOperations('foo'); // 0 $three->await(); // 4 $semaphore->getPendingOperations('foo'); // 0
-
KeyedSemaphore::getTotalPendingOperations(): int<0, max>
Returns the total number of pending operations.
use Psl\Async; $semaphore = new Async\KeyedSemaphore(1, static fn(string $_key, int $input): void => $input + 1); $one = Async\run(fn() => $semaphore->waitFor('foo', 1)); $two = Async\run(fn() => $semaphore->waitFor('foo', 2)); $three = Async\run(fn() => $semaphore->waitFor('bar', 3)); $four = Async\run(fn() => $semaphore->waitFor('bar', 4)); $semaphore->getTotalPendingOperations(); // 2
-
KeyedSemaphore::hasPendingOperations(Tk $key): bool
Check if there's any operations pending execution for the given key.
If this method returns
true
, it means the semaphore has reached it's limits, future calls towaitFor
will wait.
use Psl\Async; $semaphore = new Async\KeyedSemaphore(1, static fn(string $_key, int $input): void => $input + 1); $semaphore->hasPendingOperations('foo'); // false $one = Async\run(fn() => $semaphore->waitFor('foo', 1)); $two = Async\run(fn() => $semaphore->waitFor('foo', 2)); $semaphore->hasPendingOperations('foo'); // true
-
KeyedSemaphore::hasAnyPendingOperations(): bool
Check if there's any operations pending execution.
use Psl\Async; $semaphore = new Async\KeyedSemaphore(1, static fn(string $_key, int $input): void => $input + 1); $semaphore->hasAnyPendingOperations(); // false $one = Async\run(fn() => $semaphore->waitFor('foo', 1)); $two = Async\run(fn() => $semaphore->waitFor('foo', 2)); $semaphore->hasAnyPendingOperations(); // true
-
KeyedSemaphore::getIngoingOperations(Tk $key): int<0, max>
Returns the number of operations that are currently being executed for the given key.
The returned number will always be lower, or equal to the concurrency limit.
use Psl\Async; $semaphore = new Async\KeyedSemaphore(1, static fn(string $_key, int $input): void => $input + 1); $semaphore->getIngoingOperations('foo'); // 0 $one = Async\run(fn() => $semaphore->waitFor('foo', 1)); $two = Async\run(fn() => $semaphore->waitFor('foo', 2)); $three = Async\run(fn() => $semaphore->waitFor('foo', 3)); $semaphore->getIngoingOperations('foo'); // 1 $semaphore->getIngoingOperations('bar'); // 0 $one->await(); // 2 $semaphore->getIngoingOperations('foo'); // 1 $two->await(); // 3 $semaphore->getIngoingOperations('foo'); // 1 $three->await(); // 4 $semaphore->getIngoingOperations('foo'); // 0
-
KeyedSemaphore::getTotalIngoingOperations(): int<0, max>
Returns the total number of operations that are currently being executed.
use Psl\Async; $semaphore = new Async\KeyedSemaphore(1, static fn(string $_key, int $input): void => $input + 1); $semaphore->getTotalIngoingOperations(); // 0 $one = Async\run(fn() => $semaphore->waitFor('foo', 1)); $two = Async\run(fn() => $semaphore->waitFor('foo', 2)); $three = Async\run(fn() => $semaphore->waitFor('bar', 3)); $four = Async\run(fn() => $semaphore->waitFor('bar', 4)); $semaphore->getTotalIngoingOperations(); // 2
-
KeyedSemaphore::hasIngoingOperations(Tk $key): bool
Check if there's any operations currently being executed for the given key.
use Psl\Async; $semaphore = new Async\KeyedSemaphore(1, static fn(string $_key, int $input): void => $input + 1); $semaphore->hasIngoingOperations('foo'); // false $one = Async\run(fn() => $semaphore->waitFor('foo', 1)); $two = Async\run(fn() => $semaphore->waitFor('foo', 2)); $semaphore->hasIngoingOperations('foo'); // true
-
KeyedSemaphore::hasAnyIngoingOperations(): bool
Returns
true
if there's any operations currently being executed,false
otherwise.
use Psl\Async; $semaphore = new Async\KeyedSemaphore(1, static fn(string $_key, int $input): void => $input + 1); $semaphore->hasAnyIngoingOperations(); // false $one = Async\run(fn() => $semaphore->waitFor('foo', 1)); $semaphore->hasAnyIngoingOperations(); // true
-
KeyedSemaphore::waitForPending(Tk $key): void
Wait for all pending operations for the given key to complete.
use Psl\Async; $semaphore = new Async\KeyedSemaphore(1, static fn(string $_key, int $input): void => $input + 1); $one = Async\run(fn() => $semaphore->waitFor('foo', 1)); $two = Async\run(fn() => $semaphore->waitFor('foo', 2)); $semaphore->waitForPending('foo'); // waits for $one and $two to complete.
-
-
final class Sequence<Tin, Tout>
Run an operation with a limit on number of ongoing asynchronous jobs of 1.
Just like
Semaphore
, all operations must have the same input typeTin
and output typeTout
, and be processed by the same function.
use Psl\Async; use Psl\IO; $sequence = new Async\Sequence(static function(int $input): void { IO\write_error_line('> started : %d', $input); Async\sleep(1); IO\write_error_line('> finished: %d', $input); }); Async\concurrently([ fn() => $sequence->waitFor(1), fn() => $sequence->waitFor(2), fn() => $sequence->waitFor(3), ]); // Output: // > started: 1 // > finished: 1 // > started: 2 // > finished: 2 // > started: 3 // > finished: 3
-
Sequence::waitFor(Tin $input): Tout
Run the operation using the given
$input
, after all previous operations have completed.
use Psl\Async; $s = new Async\Sequence(static function(int $input): void { Async\sleep(1); return $input + 1; }); $results = Async\concurrently([fn() => $s->waitFor(1), fn() => $s->waitFor(2), fn() => $s->waitFor(3)]); // [2, 3, 4]
-
Sequence::cancel(Exception $exception): void
Cancel all pending operations.
Any pending operation will fail with the given exception.
Future operations will continue execution as usual.
use Psl\Async; $s = new Async\Sequence(static function(int $input): void { Async\sleep(1); return $input + 1; }); $one = Async\run(fn() => $s->waitFor(1)); $two = Async\run(fn() => $s->waitFor(2)); $s->cancel(new Exception('foo')); $one->await(); // 2 $two->await(); // throws `Exception` with message `foo`
-
Sequence::getPendingOperations(): int<0, max>
Returns the number of operations that are currently pending.
use Psl\Async; $s = new Async\Sequence(static function(int $input): void { Async\sleep(1); return $input + 1; }); $s->getPendingOperations(); // 0 $one = Async\run(fn() => $s->waitFor(1)); $two = Async\run(fn() => $s->waitFor(2)); $s->getPendingOperations(); // 1
-
Sequence::hasPendingOperations(): bool
Returns
true
if there's any operations currently pending,false
otherwise.If this method returns
true
, it means future calls towaitFor
will wait.
use Psl\Async; $s = new Async\Sequence(static function(int $input): void { Async\sleep(1); return $input + 1; }); $s->hasPendingOperations(); // false $one = Async\run(fn() => $s->waitFor(1)); $s->hasPendingOperations(); // true
-
Sequence::hasIngoingOperations(): bool
Check if the sequence has any ingoing operations.
If this method returns
true
, it means future calls towaitFor
will wait. If this method returnsfalse
, it means future calls towaitFor
will execute immediately.
use Psl\Async; $s = new Async\Sequence(static function(int $input): void { Async\sleep(1); return $input + 1; }); $s->hasIngoingOperations(); // false $one = Async\run(fn() => $s->waitFor(1)); $s->hasIngoingOperations(); // true $one->await(); // 2 $s->hasIngoingOperations(); // false
-
Sequence::waitForPending(): void
Wait for all pending operations to complete.
use Psl\Async; $s = new Async\Sequence(static function(int $input): void { Async\sleep(1); return $input + 1; }); $one = Async\run(fn() => $s->waitFor(1)); $two = Async\run(fn() => $s->waitFor(2)); $s->waitForPending(); // waits for $one and $two to complete.
-
-
final class KeyedSequence<Tk of array-key, Tin, Tout>
Run an operation with a limit on number of ongoing asynchronous jobs of 1.
Just like
KeyedSemaphore
, all operations must have the same input typeTin
and output typeTout
, and be processed by the same function.
use Psl\Async; use Psl\IO; $sequence = new Async\KeyedSequence(1, static function(string $key, int $input): void { IO\write_error_line('> started(%s): %d', $key, $input); Async\sleep(1); IO\write_error_line('> finished(%s): %d', $key, $input); }); Async\concurrently([ fn() => $sequence->waitFor('foo', 1), fn() => $sequence->waitFor('foo', 2), fn() => $sequence->waitFor('foo', 3), fn() => $sequence->waitFor('bar', 1), ]); // Output: // > started(foo): 1 // > started(bar): 1 // > finished(foo): 1 // > finished(bar): 1 // > started(foo): 2 // > finished(foo): 2 // > started(foo): 3 // > finished(foo): 3
-
KeyedSequence::waitFor(Tk $key, Tin $input): Tout
Run the operation using the given
$input
, after all previous operations have completed.If the given
$key
is already in use, the operation will wait until the previous operation with the same$key
has completed.
use Psl\Async; $s = new Async\KeyedSequence(1, static function(string $_key, int $input): void { Async\sleep(1); return $input + 1; }); $results = Async\concurrently([fn() => $s->waitFor('foo', 1), fn() => $s->waitFor('foo', 2), fn() => $s->waitFor('foo', 3)]); // [2, 3, 4]
-
KeyedSequence::cancel(Tk $key, Exception $exception): void
Cancel all pending operations.
Any pending operation will fail with the given exception.
Future operations will continue execution as usual.
use Psl\Async; $s = new Async\KeyedSequence(1, static function(string $_key, int $input): void { Async\sleep(1); return $input + 1; }); $one = Async\run(fn() => $s->waitFor('foo', 1)); $two = Async\run(fn() => $s->waitFor('foo', 2)); $s->cancel(new Exception('foo')); $one->await(); // 2 try { $two->await(); } catch (Exception $e) { echo $e->getMessage(); // foo }
-
KeyedSequence::cancelAll(Exception $exception): void
Cancel all pending operations.
Pending operation will fail with the given exception.
Future operations will continue execution as usual.
-
KeyedSequence::getPendingOperations(Tk $key): int<0, max>
Get the number of operations pending execution for the given key.
-
KeyedSequence::getTotalPendingOperations(): int<0, max>
Get the total number of operations pending execution.
-
KeyedSequence::hasPendingOperations(Tk $key): bool
Check if there's any operations pending execution for the given key.
If this method returns
true
, it means the sequence is busy, future calls towaitFor
will wait.
-
KeyedSequence::hasAnyPendingOperations(): bool
Check if there's any operations pending execution.
-
KeyedSequence::hasIngoingOperations(Tk $key): bool
Check if the sequence has any ingoing operations for the given key.
If this method returns
true
, it means future calls towaitFor
will wait. If this method returnsfalse
, it means future calls towaitFor
will execute immediately.
-
KeyedSequence::hasAnyIngoingOperations(): bool
Check if the sequence has any ingoing operations.
-
KeyedSequence::getTotalIngoingOperations(): int<0, max>
Get the number of total ingoing operations.
-
KeyedSequence::waitForPending(Tk $key): void
Wait for all pending operations for the given key to complete.
-
-
Warning
The
Deferred
API described below is an advanced API that many applications probably don’t need. Userun(...)
, and otherAsync
combinators when possible.Deferred
is the abstraction responsible for resolving future values once they become available.A library that completes values asynchronously creates an
Deferred
and uses it to return anAwaitable
to API consumers.Once the async library determines that the value is ready it completes the
Awaitable
held by the API consumer using methods on the linkedDeferred
.
use Psl; use Psl\Async; use Psl\IO; $deferred = new Async\Deferred(); $deferred->getAwaitable()->then( static fn($result) => Psl\invariant($result === 'hello', 'Should be "hello".'), static fn($error) => Psl\invariant(false, 'Should not have failed.'), ); $deferred->complete('hello');
-
Deferred::getAwaitable()
Returns the
Awaitable
that will be completed when the value is available.Deferred
andAwaitable
are separated, so the consumer of theAwaitable
can’t complete it. You should always return$deferred->getAwaitable()
to API consumers. If you’re passingDeferred
objects around, you’re probably doing something wrong.
use Psl\Async; /** * @return Async\Awaitable<'hello'> */ function get_message(): Async\Awaitable { $deferred = new Async\Deferred(); // Complete the deferred with 'hello' after 2 second. Async\Scheduler::delay(2, static fn() => $deferred->complete('hello')); return $deferred->getAwaitable(); } get_message()->await(); // 'hello'
-
Deferred::complete(T $value)
Completes the
Awaitable
with the given value.
use Psl\Async; $deferred = new Async\Deferred(); $deferred->complete('hello'); $result = $deferred->getAwaitable()->await(); // 'hello'
-
Deferred::error(Exception $exception): void
Makes the
Awaitable
fail with the given$exception
.
use Psl\Async; $deferred = new Async\Deferred(); $deferred->error(new Exception('Something went wrong!')); try { $deferred->getAwaitable()->await(); } catch (Exception $e) { // handle the exception... }
-
Deferred::isComplete(): bool
Returns
true
if theAwaitable
has been completed.
use Psl\Async; $deferred = new Async\Deferred(); Psl\invariant(false === $deferred->isComplete(), 'Should be pending.'); $deferred->complete('hello'); Psl\invariant(true === $deferred->isComplete(), 'Should be complete.');
-
-
Psl wrapper around Revolt event-loop.
See revolt.run for more information.
-
static Scheduler::createSuspension(): Revolt\EventLoop\Suspension
Create an object used to suspend and resume execution, either within a fiber or from {main}.
use Psl\Async; $suspension = Async\Scheduler::createSuspension(); // schedule the resume of the suspension 2 seconds from now. Async\Scheduler::delay(2.0, static fn() => $suspension->resume()); // suspend the suspension until it is resumed. $suspension->suspend();
-
static Scheduler::onSignal(int $signal_number, (Closure(string, int): void) $callback): non-empty-string
Register a callback to be called when the given signal is received.
Returns a unique identifier that can be used to cancel, enable or disable the callback.
see revolt documentation for more information.
-
static Scheduler::onReadable(object|resource $stream, (Closure(string, object|resource): void) $callback): non-empty-string
Execute a callback when a stream resource becomes readable or is closed for reading.
Returns a unique identifier that can be used to cancel, enable or disable the callback.
see revolt documentation for more information.
-
static Scheduler::onWritable(object|resource $stream, (Closure(string, object|resource): void) $callback): non-empty-string
Execute a callback when a stream resource becomes writable or is closed for writing.
Returns a unique identifier that can be used to cancel, enable or disable the callback.
see revolt documentation for more information.
-
static Scheduler::defer((Closure(): void) $callback): non-empty-string
Defer the execution of a callback.
Returns a unique identifier that can be used to cancel, enable or disable the callback.
see revolt documentation for more information.
-
static Scheduler::delay(float $seconds, (Closure(): void) $callback): non-empty-string
Delay the execution of a callback.
Returns a unique identifier that can be used to cancel, enable or disable the callback.
see revolt documentation for more information.
-
static Scheduler::repeat(float $interval, (Closure(): void) $callback): non-empty-string
Repeatedly execute a callback.
Returns a unique identifier that can be used to cancel, enable or disable the callback.
see revolt documentation for more information.
-
static Scheduler::cancel(string $id): void
Cancel a callback.
see revolt documentation for more information.
-
static Scheduler::enable(string $id): void
Enable a callback.
see revolt documentation for more information.
-
static Scheduler::disable(string $id): void
Disable a callback.
see revolt documentation for more information.
-
static Scheduler::reference(string $id): void
Reference a callback.
see revolt documentation for more information.
-
static Scheduler::unreference(string $id): void
Remove a reference to a callback.
see revolt documentation for more information.
-
static Scheduler::queue((Closure(): void) $callback): void
Queue a microtask.
-
static Scheduler::run(): void
Run the event loop.
This method will wait until there's no more callbacks to execute.
If a signal callback is registered, this method will block until the signal is received.
see revolt documentation for more information.
-
-
final class Exception\ComositeException
A
Exception\CompositeException
that can be used to wrap multipleException
s.
-
CompositeException::__construct(non-empty-array<array-key, Exception> $reasons)
Constructs a new
Exception\CompositeException
with the given$reasons
.
use Psl\Async; $exception = new Async\Exception\CompositeException([ new Exception('Something went wrong!'), new Exception('Something else went wrong!'), ]);
-
CompositeException::getReasons(): non-empty-array<array-key, Exception>
Returns the
$reasons
that were wrapped.
use Psl\Async; $exception = new Async\Exception\CompositeException([ new Exception('Something went wrong!'), new Exception('Something else went wrong!'), ]); $exceptions = $exception->getReasons();
-
-
final class Exception\TimeoutException
A
Exception\TimeoutException
is thrown when a task is not completed within the given$timeout
.
use Psl\Async; use Psl\IO; $awaitable = Async\run(static function(): void { Async\sleep(4); }, timeout: 1.0); try { $awaitable->await(); } catch (Async\Exception\TimeoutException $exception) { IO\write_error_line('Task timed out!'); }
-
final class Exception\UnhandledAwaitableException
A
Exception\UnhandledAwaitableException
is thrown from the scheduler when a failedAwaitable
is not handled.
use Psl\Async; Async\run(static function(): void { throw new Exception('Something went wrong!'); }); try { Async\Scheduler::run(); } catch (Async\Exception\UnhandledAwaitableException $exception) { IO\write_error_line('Unhandled awaitable!'); IO\write_error_line('Previous exception: %s', $exception->getPrevious()->getMessage()); } // Output: // Unhandled awaitable! // Previous exception: Something went wrong!