Skip to content
This repository has been archived by the owner on Dec 4, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2 from mallgroup/feature-lazy-connect
Browse files Browse the repository at this point in the history
Added lazy connect option
  • Loading branch information
Radovan Kepák authored Jul 21, 2021
2 parents 2137574 + 84d219b commit 46f909b
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 4 deletions.
9 changes: 7 additions & 2 deletions .docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ rabbitmq:
queues:
testQueue:
connection: default
# force queue declare on first queue operation during request
# force queue on queue usage (no matter if message is send or not)
# autoCreate: true
# force queue declare on first queue operation
# autoCreate: lazy
exchanges:
testExchange:
Expand Down Expand Up @@ -88,7 +90,10 @@ php index.php rabbitmq:declareQueuesAndExchanges
It's intended to be a part of the deploy process to make sure all the queues and exchanges are prepared for use.

If you need to override this behavior (for example only declare queues that are used during a request and nothing else),
just add the `autoCreate: true` parameter to queue or exchange of your choice.
just add the `autoCreate: true` parameter to queue or exchange of your choice. But this will try to create them even if
exchange is added into script but not used at all (ie: autoloaded). If you really want to use this with first message
published set value to `autoCreate: lazy`. That will ensure exchanges/queues are declared after really connection is
established and needed.

You may also want to declare the queues and exchanges via rabbitmq management interface or a script but if you fail to
do so, don't run the declare console command and don't specify `autoCreate: true`, exceptions will be thrown when
Expand Down
24 changes: 24 additions & 0 deletions src/Connection/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ final class Connection implements IConnection
private array $connectionParams;
private int $lastBeat = 0;
private ?Channel $channel = null;
private array $onConnect = [];


public function __construct(
Expand Down Expand Up @@ -61,6 +62,18 @@ public function __destruct()
}


public function onConnect(callable $callback): void
{
if ($this->bunnyClient->isConnected()) {
$callback();

return;
}

$this->onConnect[] = $callback;
}


/**
* @throws ConnectionException
*/
Expand Down Expand Up @@ -113,6 +126,7 @@ public function connectIfNeeded(): void
}

$this->bunnyClient->connect();
$this->invokeCallbacks();
}


Expand All @@ -125,6 +139,7 @@ public function sendHeartbeat(): void
}
}


public function reconnect(): void
{
$this->bunnyClient->syncDisconnect(); // close current connection to get rid of error on script exit - destructor
Expand All @@ -140,6 +155,15 @@ public function reconnect(): void
$this->channel = $channel;
}


private function invokeCallbacks(): void
{
foreach ($this->onConnect as $callback) {
$callback();
}
}


private function createNewConnection(): Client
{
return new Client($this->connectionParams);
Expand Down
2 changes: 2 additions & 0 deletions src/Connection/IConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ public function getChannel(): Channel;
public function sendHeartbeat(): void;

public function reconnect(): void;

public function onConnect(callable $callback): void;
}
6 changes: 5 additions & 1 deletion src/Exchange/ExchangeFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ private function create(string $name): IExchange
$connection = $this->connectionFactory->getConnection($exchangeData['connection']);

if ($exchangeData['autoCreate']) {
$this->exchangeDeclarator->declareExchange($name);
if ($exchangeData['autoCreate'] === 'lazy') {
$connection->onConnect(fn () => $this->exchangeDeclarator->declareExchange($name));
} else {
$this->exchangeDeclarator->declareExchange($name);
}
}

if ($exchangeData['queueBindings'] !== []) {
Expand Down
6 changes: 5 additions & 1 deletion src/Queue/QueueFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ private function create(string $name): IQueue
$connection = $this->connectionFactory->getConnection($queueData['connection']);

if ($queueData['autoCreate']) {
$this->queueDeclarator->declareQueue($name);
if ($queueData['autoCreate'] === 'lazy') {
$connection->onConnect(fn () => $this->queueDeclarator->declareQueue($name));
} else {
$this->queueDeclarator->declareQueue($name);
}
}

return new Queue(
Expand Down

0 comments on commit 46f909b

Please sign in to comment.