Skip to content

Commit

Permalink
Support for 'STARTED' task state.
Browse files Browse the repository at this point in the history
Also, if the state is not one of the READY_STATES (as defined in Celery
source), do not block fetching new results from result backend.

See gjedeer#49
  • Loading branch information
smuuf committed Aug 28, 2020
1 parent 9655ad0 commit 9c9e624
Showing 1 changed file with 29 additions and 13 deletions.
42 changes: 29 additions & 13 deletions src/AsyncResult.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
*/
class AsyncResult
{
private const READY_STATES = ['SUCCESS', 'FAILURE', 'REVOKED'];
private const UNREADY_STATES = ['PENDING', 'RECEIVED', 'STARTED', 'REJECTED', 'RETRY'];

private $task_id; // string, queue name
private $connection; // AMQPConnection instance
private $connection_details; // array of strings required to connect
Expand Down Expand Up @@ -45,8 +48,10 @@ public function __wakeup()
* Connect to queue, see if there's a result waiting for us
* Private - to be used internally
*/
private function getCompleteResult()
private function fetchCurrentResult()
{
// If task's state is one of the unready states, fetch the result
// again. Otherwise return the most recent state.
if ($this->complete_result) {
return $this->complete_result;
}
Expand All @@ -58,14 +63,26 @@ private function getCompleteResult()
$this->remove_from_queue
);

if ($message !== false) {
if (!$message) {
return false;
}

if (
!in_array(
$message['complete_result']['status'],
self::UNREADY_STATES,
true
)
) {
$this->complete_result = $message['complete_result'];
$this->body = json_decode(
$message['body']
);
}

return false;
$this->body = json_decode(
$message['body']
);

return $message['complete_result'];

}

/**
Expand All @@ -92,7 +109,9 @@ public function getId()
*/
public function isReady()
{
return ($this->getCompleteResult() !== false);
$this->fetchCurrentResult();
return !empty($this->body->status)
&& in_array($this->body->status, self::READY_STATES, true);
}

/**
Expand All @@ -101,8 +120,9 @@ public function isReady()
*/
public function getStatus()
{
$this->fetchCurrentResult();
if (!$this->body) {
throw new CeleryException('Called getStatus before task was ready');
return 'PENDING';
}
return $this->body->status;
}
Expand Down Expand Up @@ -230,11 +250,7 @@ public function __get($property)
* status: Deprecated alias of state.
*/
elseif ($property == 'state' || $property == 'status') {
if ($this->isReady()) {
return $this->getStatus();
} else {
return 'PENDING';
}
return $this->getStatus();
}

return $this->$property;
Expand Down

0 comments on commit 9c9e624

Please sign in to comment.