Skip to content

Commit

Permalink
TASK: Tweaks after code review
Browse files Browse the repository at this point in the history
- simplify query builder use
- add some sprintf
- rename some variables
- amend docblocks
  • Loading branch information
kdambekalns committed Jan 19, 2024
1 parent 09c8a34 commit ba14187
Showing 1 changed file with 89 additions and 54 deletions.
143 changes: 89 additions & 54 deletions Classes/Queue/DoctrineQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,29 @@ public function getName(): string
/**
* @inheritdoc
* @throws DBALException
* @throws \JsonException
* @throws \Doctrine\DBAL\Driver\Exception
*/
public function submit($payload, array $options = []): string
{
if ($this->connection->getDatabasePlatform()->getName() === 'postgresql') {
$insertStatement = $this->connection->prepare("INSERT INTO {$this->connection->quoteIdentifier($this->tableName)} (payload, state, scheduled) VALUES (:payload, 'ready', {$this->resolveScheduledQueryPart($options)}) RETURNING id");
$result = $insertStatement->executeQuery(['payload' => json_encode($payload)]);
$insertStatement = $this->connection->prepare(sprintf(
"INSERT INTO %s (payload, state, scheduled) VALUES (:payload, 'ready', %s) RETURNING id",
$this->connection->quoteIdentifier($this->tableName),
$this->resolveScheduledQueryPart($options)
));
$result = $insertStatement->executeQuery(['payload' => json_encode($payload, JSON_THROW_ON_ERROR)]);
return (string)$result->fetchOne();
}

$numberOfAffectedRows = (int)$this->connection->executeStatement("INSERT INTO {$this->connection->quoteIdentifier($this->tableName)} (payload, state, scheduled) VALUES (:payload, 'ready', {$this->resolveScheduledQueryPart($options)})", ['payload' => json_encode($payload)]);
$numberOfAffectedRows = (int)$this->connection->executeStatement(
sprintf(
"INSERT INTO %s (payload, state, scheduled) VALUES (:payload, 'ready', %s)",
$this->connection->quoteIdentifier($this->tableName),
$this->resolveScheduledQueryPart($options)
),
['payload' => json_encode($payload, JSON_THROW_ON_ERROR)]
);
if ($numberOfAffectedRows !== 1) {
return '';
}
Expand All @@ -164,6 +177,9 @@ public function submit($payload, array $options = []): string
/**
* @inheritdoc
* @throws DBALException
* @throws Exception
* @throws \Doctrine\DBAL\Driver\Exception
* @throws \JsonException
*/
public function waitAndTake(?int $timeout = null): ?Message
{
Expand All @@ -172,7 +188,10 @@ public function waitAndTake(?int $timeout = null): ?Message
return null;
}

$numberOfDeletedRows = $this->connection->delete($this->connection->quoteIdentifier($this->tableName), ['id' => (int)$message->getIdentifier()]);
$numberOfDeletedRows = $this->connection->delete(
$this->connection->quoteIdentifier($this->tableName),
['id' => (int)$message->getIdentifier()]
);
if ($numberOfDeletedRows !== 1) {
// TODO error handling
return null;
Expand All @@ -184,6 +203,8 @@ public function waitAndTake(?int $timeout = null): ?Message
/**
* @inheritdoc
* @throws DBALException
* @throws \Doctrine\DBAL\Driver\Exception
* @throws \JsonException
*/
public function waitAndReserve(?int $timeout = null): ?Message
{
Expand All @@ -192,6 +213,8 @@ public function waitAndReserve(?int $timeout = null): ?Message

/**
* @throws DBALException
* @throws \JsonException
* @throws \Doctrine\DBAL\Driver\Exception
*/
protected function reserveMessage(?int $timeout = null): ?Message
{
Expand All @@ -200,38 +223,44 @@ protected function reserveMessage(?int $timeout = null): ?Message
}
$this->reconnectDatabaseConnection();

$qb = $this->connection->createQueryBuilder();
$qb
$selectMessageQuery = $this->createQueryBuilder();
$selectMessageQuery
->select('*')
->from($this->connection->quoteIdentifier($this->tableName))
->where(
$qb->expr()->and(
$qb->expr()->eq('state', $qb->expr()->literal('ready')),
$this->getScheduledQueryConstraint($qb)
$selectMessageQuery->expr()->and(
'state = :state',
$this->getScheduledQueryConstraint($selectMessageQuery)
)
)
->setParameter('state', 'ready')
->orderBy('id')
->setMaxResults(1);

$reserveMessageQuery = $this->connection->createQueryBuilder();
$reserveMessageQuerySql = $reserveMessageQuery
->update($this->connection->quoteIdentifier($this->tableName))
->set('state', ':newstate')
->where('id = :id')
->andWhere('state = :state')
->andWhere($this->getScheduledQueryConstraint($reserveMessageQuery))
->getSQL();

$startTime = time();
do {
try {
$row = $qb->execute()->fetchAssociative();
$row = $selectMessageQuery->execute()->fetchAssociative();
} catch (TableNotFoundException $exception) {
throw new \RuntimeException(sprintf('The queue table "%s" could not be found. Did you run ./flow queue:setup "%s"?', $this->tableName, $this->name), 1469117906, $exception);
}
if ($row !== false) {
$innerQueryBuilder = $this->connection->createQueryBuilder();
$innerQueryBuilder
->update($this->connection->quoteIdentifier($this->tableName))
->set('state', $innerQueryBuilder->expr()->literal('reserved'))
->where(
$innerQueryBuilder->expr()->and(
$innerQueryBuilder->expr()->eq('id', (int)$row['id']),
$innerQueryBuilder->expr()->eq('state', $innerQueryBuilder->expr()->literal('ready')),
$this->getScheduledQueryConstraint($innerQueryBuilder)
)
);
$numberOfUpdatedRows = (int)$this->connection->executeStatement($innerQueryBuilder->getSQL());
$numberOfUpdatedRows = (int)$this->connection->executeStatement(
$reserveMessageQuerySql,
[
'newstate' => 'reserved',
'id' => (int)$row['id'],
'state' => 'ready'
]
);
if ($numberOfUpdatedRows === 1) {
$this->lastMessageTime = time();
return $this->getMessageFromRow($row);
Expand All @@ -252,15 +281,25 @@ protected function reserveMessage(?int $timeout = null): ?Message
*/
public function release(string $messageId, array $options = []): void
{
$this->connection->executeStatement("UPDATE {$this->connection->quoteIdentifier($this->tableName)} SET state = 'ready', failures = failures + 1, scheduled = {$this->resolveScheduledQueryPart($options)} WHERE id = :id", ['id' => (int)$messageId]);
$this->connection->executeStatement(
sprintf(
"UPDATE %s SET state = 'ready', failures = failures + 1, scheduled = %s WHERE id = :id",
$this->connection->quoteIdentifier($this->tableName),
$this->resolveScheduledQueryPart($options)
),
['id' => (int)$messageId]);
}

/**
* @inheritdoc
*/
public function abort(string $messageId): void
{
$this->connection->update($this->connection->quoteIdentifier($this->tableName), ['state' => 'failed'], ['id' => (int)$messageId]);
$this->connection->update(
$this->connection->quoteIdentifier($this->tableName),
['state' => 'failed'],
['id' => (int)$messageId]
);
}

/**
Expand All @@ -277,19 +316,15 @@ public function finish(string $messageId): bool
*/
public function peek(int $limit = 1): array
{
$qb = $this->connection->createQueryBuilder();
$qb
$selectMessagesQuery = $this->createQueryBuilder();
$selectMessagesQuery
->select('*')
->from($this->connection->quoteIdentifier($this->tableName))
->where(
$qb->expr()->and(
$qb->expr()->eq('state', $qb->expr()->literal('ready')),
$this->getScheduledQueryConstraint($qb)
)
)
->where('state = :state')
->andWhere($this->getScheduledQueryConstraint($selectMessagesQuery))
->setParameter('state', 'ready')
->orderBy('id')
->setMaxResults($limit);
$rows = $qb->execute()->fetchAllAssociative();
$rows = $selectMessagesQuery->execute()->fetchAllAssociative();
$messages = [];

foreach ($rows as $row) {
Expand All @@ -304,13 +339,10 @@ public function peek(int $limit = 1): array
*/
public function countReady(): int
{
$qb = $this->connection->createQueryBuilder();
return (int)$qb
return (int)$this->createQueryBuilder()
->select('COUNT(*)')
->from($this->connection->quoteIdentifier($this->tableName))
->where(
$qb->expr()->eq('state', $qb->expr()->literal('ready')),
)
->where('state = :state')
->setParameter('state', 'ready')
->execute()
->fetchOne();
}
Expand All @@ -320,13 +352,10 @@ public function countReady(): int
*/
public function countReserved(): int
{
$qb = $this->connection->createQueryBuilder();
return (int)$qb
return (int)$this->createQueryBuilder()
->select('COUNT(*)')
->from($this->connection->quoteIdentifier($this->tableName))
->where(
$qb->expr()->eq('state', $qb->expr()->literal('reserved')),
)
->where('state = :state')
->setParameter('state', 'reserved')
->execute()
->fetchOne();
}
Expand All @@ -336,13 +365,10 @@ public function countReserved(): int
*/
public function countFailed(): int
{
$qb = $this->connection->createQueryBuilder();
return (int)$qb
return (int)$this->createQueryBuilder()
->select('COUNT(*)')
->from($this->connection->quoteIdentifier($this->tableName))
->where(
$qb->expr()->eq('state', $qb->expr()->literal('failed')),
)
->where('state = :state')
->setParameter('state', 'failed')
->execute()
->fetchOne();
}
Expand All @@ -363,9 +389,12 @@ public function flush(): void
$this->setUp();
}

/**
* @throws \JsonException
*/
protected function getMessageFromRow(array $row): Message
{
return new Message($row['id'], json_decode($row['payload'], true), (int)$row['failures']);
return new Message($row['id'], json_decode($row['payload'], true, 512, JSON_THROW_ON_ERROR), (int)$row['failures']);
}

/**
Expand All @@ -388,6 +417,12 @@ protected function getScheduledQueryConstraint(QueryBuilder $qb): CompositeExpre
);
}

private function createQueryBuilder(): QueryBuilder
{
return $this->connection->createQueryBuilder()
->from($this->connection->quoteIdentifier($this->tableName));
}

/**
* Reconnects the database connection associated with this queue, if it doesn't respond to a ping
*
Expand Down

0 comments on commit ba14187

Please sign in to comment.