diff --git a/composer.json b/composer.json index d6e7793..1d3bede 100644 --- a/composer.json +++ b/composer.json @@ -18,12 +18,12 @@ }, "require": { "php": ">=5.3", - "react/promise": "^2.2.1 || ^1.2.1" + "react/promise": "^3 || ^2.2.1 || ^1.2.1" }, "require-dev": { - "clue/block-react": "^1.0", + "clue/block-react": "^1.5", "phpunit/phpunit": "^9.3 || ^5.7 || ^4.8.35", "react/event-loop": "^1.2", - "react/http": "^1.5" + "react/http": "^1.8" } } diff --git a/src/Queue.php b/src/Queue.php index 7d399ac..f4e827b 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -3,7 +3,6 @@ namespace Clue\React\Mq; use React\Promise; -use React\Promise\CancellablePromiseInterface; use React\Promise\Deferred; use React\Promise\PromiseInterface; @@ -124,7 +123,7 @@ public static function all($concurrency, array $jobs, $handler) Promise\all($promises)->then($resolve, function ($e) use ($promises, $reject) { // cancel all pending promises if a single promise fails foreach (array_reverse($promises) as $promise) { - if ($promise instanceof CancellablePromiseInterface) { + if ($promise instanceof PromiseInterface && \method_exists($promise, 'cancel')) { $promise->cancel(); } } @@ -135,7 +134,7 @@ public static function all($concurrency, array $jobs, $handler) }, function () use ($promises) { // cancel all pending promises on cancellation foreach (array_reverse($promises) as $promise) { - if ($promise instanceof CancellablePromiseInterface) { + if ($promise instanceof PromiseInterface && \method_exists($promise, 'cancel')) { $promise->cancel(); } } @@ -241,7 +240,7 @@ public static function any($concurrency, array $jobs, $handler) Promise\any($promises)->then(function ($result) use ($promises, $resolve) { // cancel all pending promises if a single result is ready foreach (array_reverse($promises) as $promise) { - if ($promise instanceof CancellablePromiseInterface) { + if ($promise instanceof PromiseInterface && \method_exists($promise, 'cancel')) { $promise->cancel(); } } @@ -252,7 +251,7 @@ public static function any($concurrency, array $jobs, $handler) }, function () use ($promises) { // cancel all pending promises on cancellation foreach (array_reverse($promises) as $promise) { - if ($promise instanceof CancellablePromiseInterface) { + if ($promise instanceof PromiseInterface && \method_exists($promise, 'cancel')) { $promise->cancel(); } } @@ -367,7 +366,7 @@ public function __invoke() $deferred = new Deferred(function ($_, $reject) use (&$queue, $id, &$deferred) { // forward cancellation to pending operation if it is currently executing - if (isset($deferred->pending) && $deferred->pending instanceof CancellablePromiseInterface) { + if (isset($deferred->pending) && $deferred->pending instanceof PromiseInterface && \method_exists($deferred->pending, 'cancel')) { $deferred->pending->cancel(); } unset($deferred->pending); diff --git a/tests/QueueTest.php b/tests/QueueTest.php index 0c99102..247f879 100644 --- a/tests/QueueTest.php +++ b/tests/QueueTest.php @@ -217,7 +217,7 @@ public function testCancelPendingOperationThatWasPreviousQueuedShouldInvokeItsCa $second = $q(new Promise(function () { }, $this->expectCallableOnce())); - $deferred->resolve(); + $deferred->resolve(null); $second->cancel(); } @@ -232,7 +232,7 @@ public function testCancelPendingOperationThatWasPreviouslyQueuedShouldRejectWit $second = $q(new Promise(function () { }, function () { throw new \BadMethodCallException(); })); - $deferred->resolve(); + $deferred->resolve(null); $second->cancel(); $second->then(null, $this->expectCallableOnceWith($this->isInstanceOf('BadMethodCallException'))); @@ -249,7 +249,7 @@ public function testCancelPendingOperationThatWasPreviouslyQueuedShouldNotReject $second = $q(new Promise(function () { }, function () { })); - $deferred->resolve(); + $deferred->resolve(null); $second->cancel(); $second->then($this->expectCallableNever(), $this->expectCallableNever());