diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index c775efc02..5fdf6b59e 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -39,13 +39,13 @@ class DbalContext implements Context * Callable must return instance of Doctrine\DBAL\Connection once called. * * @param Connection|callable $connection - * @param array $config */ public function __construct($connection, array $config = []) { $this->config = array_replace([ 'table_name' => 'enqueue', 'polling_interval' => null, + 'subscription_polling_interval' => null, ], $config); if ($connection instanceof Connection) { @@ -53,11 +53,7 @@ public function __construct($connection, array $config = []) } elseif (is_callable($connection)) { $this->connectionFactory = $connection; } else { - throw new \InvalidArgumentException(sprintf( - 'The connection argument must be either %s or callable that returns %s.', - Connection::class, - Connection::class - )); + throw new \InvalidArgumentException(sprintf('The connection argument must be either %s or callable that returns %s.', Connection::class, Connection::class)); } } @@ -135,6 +131,10 @@ public function createSubscriptionConsumer(): SubscriptionConsumer $consumer->setRedeliveryDelay($this->config['redelivery_delay']); } + if (isset($this->config['subscription_polling_interval'])) { + $consumer->setPollingInterval($this->config['subscription_polling_interval']); + } + return $consumer; } @@ -202,10 +202,7 @@ public function getDbalConnection(): Connection if (false == $this->connection) { $connection = call_user_func($this->connectionFactory); if (false == $connection instanceof Connection) { - throw new \LogicException(sprintf( - 'The factory must return instance of Doctrine\DBAL\Connection. It returns %s', - is_object($connection) ? get_class($connection) : gettype($connection) - )); + throw new \LogicException(sprintf('The factory must return instance of Doctrine\DBAL\Connection. It returns %s', is_object($connection) ? get_class($connection) : gettype($connection))); } $this->connection = $connection; diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index 60d30cc7e..2551043e3 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -37,8 +37,12 @@ class DbalSubscriptionConsumer implements SubscriptionConsumer private $redeliveryDelay; /** - * @param DbalContext $context + * Time to wait between subscription requests in milliseconds. + * + * @var int */ + private $pollingInterval = 200; + public function __construct(DbalContext $context) { $this->context = $context; @@ -63,6 +67,18 @@ public function setRedeliveryDelay(int $redeliveryDelay): self return $this; } + public function getPollingInterval(): int + { + return $this->pollingInterval; + } + + public function setPollingInterval(int $msec): self + { + $this->pollingInterval = $msec; + + return $this; + } + public function consume(int $timeout = 0): void { if (empty($this->subscribers)) { @@ -92,7 +108,7 @@ public function consume(int $timeout = 0): void * @var DbalConsumer * @var callable $callback */ - list($consumer, $callback) = $this->subscribers[$message->getQueue()]; + [$consumer, $callback] = $this->subscribers[$message->getQueue()]; if (false === call_user_func($callback, $message, $consumer)) { return; @@ -102,7 +118,7 @@ public function consume(int $timeout = 0): void } else { $currentQueueNames = []; - usleep(200000); // 200ms + usleep($this->getPollingInterval() * 1000); } if ($timeout && microtime(true) >= $now + $timeout) { diff --git a/pkg/dbal/Tests/DbalContextTest.php b/pkg/dbal/Tests/DbalContextTest.php index 0b793e6e7..12a13b21a 100644 --- a/pkg/dbal/Tests/DbalContextTest.php +++ b/pkg/dbal/Tests/DbalContextTest.php @@ -39,6 +39,7 @@ public function testCouldBeConstructedWithEmptyConfiguration() $this->assertAttributeEquals([ 'table_name' => 'enqueue', 'polling_interval' => null, + 'subscription_polling_interval' => null, ], 'config', $factory); } @@ -47,11 +48,13 @@ public function testCouldBeConstructedWithCustomConfiguration() $factory = new DbalContext($this->createConnectionMock(), [ 'table_name' => 'theTableName', 'polling_interval' => 12345, + 'subscription_polling_interval' => 12345, ]); $this->assertAttributeEquals([ 'table_name' => 'theTableName', 'polling_interval' => 12345, + 'subscription_polling_interval' => 12345, ], 'config', $factory); }