Skip to content

Commit e4b0331

Browse files
authored
Merge pull request #7 from clue-labs/cancel
Implement cancellation forwarding for previously queued operations
2 parents 80226a0 + c1bd5eb commit e4b0331

File tree

3 files changed

+109
-22
lines changed

3 files changed

+109
-22
lines changed

.travis.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ php:
88
- 7.0
99
- 7.1
1010
- 7.2
11-
- hhvm
11+
- hhvm # ignore errors, see below
1212

1313
# lock distro so new future defaults will not break the build
1414
dist: trusty
@@ -17,6 +17,8 @@ matrix:
1717
include:
1818
- php: 5.3
1919
dist: precise
20+
allow_failures:
21+
- php: hhvm
2022

2123
sudo: false
2224

src/Queue.php

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Clue\React\Mq;
44

55
use React\Promise;
6+
use React\Promise\CancellablePromiseInterface;
67
use React\Promise\Deferred;
78
use React\Promise\PromiseInterface;
89

@@ -134,30 +135,26 @@ public function __invoke()
134135
end($queue);
135136
$id = key($queue);
136137

137-
$deferred = new Deferred(function ($_, $reject) use (&$queue, $id) {
138-
// queued promise cancelled before its handler is invoked
139-
// remove from queue and reject explicitly
140-
unset($queue[$id]);
141-
$reject(new \RuntimeException('Cancelled queued job before processing started'));
138+
$deferred = new Deferred(function ($_, $reject) use (&$queue, $id, &$deferred) {
139+
// forward cancellation to pending operation if it is currently executing
140+
if (isset($deferred->pending) && $deferred->pending instanceof CancellablePromiseInterface) {
141+
$deferred->pending->cancel();
142+
}
143+
unset($deferred->pending);
144+
145+
if (isset($deferred->args)) {
146+
// queued promise cancelled before its handler is invoked
147+
// remove from queue and reject explicitly
148+
unset($queue[$id], $deferred->args);
149+
$reject(new \RuntimeException('Cancelled queued job before processing started'));
150+
}
142151
});
143152

144153
// queue job to process if number of pending jobs is below concurrency limit again
154+
$deferred->args = func_get_args();
145155
$queue[$id] = $deferred;
146156

147-
// once number of pending jobs is below concurrency limit again:
148-
// await this situation, invoke handler and await its resolution before invoking next queued job
149-
$handler = $this->handler;
150-
$args = func_get_args();
151-
$that = $this;
152-
$pending =& $this->pending;
153-
return $deferred->promise()->then(function () use ($handler, $args, $that, &$pending) {
154-
++$pending;
155-
156-
// invoke handler and await its resolution before invoking next queued job
157-
return $that->await(
158-
call_user_func_array($handler, $args)
159-
);
160-
});
157+
return $deferred->promise();
161158
}
162159

163160
public function count()
@@ -193,9 +190,28 @@ public function processQueue()
193190
return;
194191
}
195192

196-
$first = reset($this->queue);
193+
/* @var $deferred Deferred */
194+
$deferred = reset($this->queue);
197195
unset($this->queue[key($this->queue)]);
198196

199-
$first->resolve();
197+
// once number of pending jobs is below concurrency limit again:
198+
// await this situation, invoke handler and await its resolution before invoking next queued job
199+
++$this->pending;
200+
201+
$promise = call_user_func_array($this->handler, $deferred->args);
202+
$deferred->pending = $promise;
203+
unset($deferred->args);
204+
205+
// invoke handler and await its resolution before invoking next queued job
206+
$this->await($promise)->then(
207+
function ($result) use ($deferred) {
208+
unset($deferred->pending);
209+
$deferred->resolve($result);
210+
},
211+
function ($e) use ($deferred) {
212+
unset($deferred->pending);
213+
$deferred->reject($e);
214+
}
215+
);
200216
}
201217
}

tests/QueueTest.php

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,4 +211,73 @@ public function testCancelPendingWillRejectPromiseAndRemoveJobFromQueue()
211211
$second->then(null, $this->expectCallableOnce());
212212
$this->assertCount(1, $q);
213213
}
214+
215+
public function testCancelPendingOperationThatWasPreviousQueuedShouldInvokeItsCancellationHandler()
216+
{
217+
$q = new Queue(1, null, function ($promise) {
218+
return $promise;
219+
});
220+
221+
$deferred = new Deferred();
222+
$first = $q($deferred->promise());
223+
224+
$second = $q(new Promise(function () { }, $this->expectCallableOnce()));
225+
226+
$deferred->resolve();
227+
$second->cancel();
228+
}
229+
230+
public function testCancelPendingOperationThatWasPreviouslyQueuedShouldRejectWithCancellationResult()
231+
{
232+
$q = new Queue(1, null, function ($promise) {
233+
return $promise;
234+
});
235+
236+
$deferred = new Deferred();
237+
$first = $q($deferred->promise());
238+
239+
$second = $q(new Promise(function () { }, function () { throw new \BadMethodCallException(); }));
240+
241+
$deferred->resolve();
242+
$second->cancel();
243+
244+
$second->then(null, $this->expectCallableOnceWith($this->isInstanceOf('BadMethodCallException')));
245+
}
246+
247+
public function testCancelPendingOperationThatWasPreviouslyQueuedShouldNotRejectIfCancellationHandlerDoesNotReject()
248+
{
249+
$q = new Queue(1, null, function ($promise) {
250+
return $promise;
251+
});
252+
253+
$deferred = new Deferred();
254+
$first = $q($deferred->promise());
255+
256+
$second = $q(new Promise(function () { }, function () { }));
257+
258+
$deferred->resolve();
259+
$second->cancel();
260+
261+
$second->then($this->expectCallableNever(), $this->expectCallableNever());
262+
}
263+
264+
public function testCancelNextOperationFromFirstOperationShouldInvokeCancellationHandler()
265+
{
266+
$q = new Queue(1, null, function () {
267+
return new Promise(function () { }, function () {
268+
throw new \RuntimeException();
269+
});
270+
});
271+
272+
$first = $q();
273+
$second = $q();
274+
275+
$first->then(null, function () use ($second) {
276+
$second->cancel();
277+
});
278+
279+
$first->cancel();
280+
281+
$second->then(null, $this->expectCallableOnce());
282+
}
214283
}

0 commit comments

Comments
 (0)