Skip to content

Commit

Permalink
Added support for multiple entity managers
Browse files Browse the repository at this point in the history
  • Loading branch information
sisve committed Apr 5, 2019
1 parent 21744ad commit d10bdcf
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/DoctrineQueueProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected function registerWorker()

$this->app->singleton('safeQueue.worker', function ($app) {
return new Worker($app['queue'], $app['events'],
$app['em'], $app['Illuminate\Contracts\Debug\ExceptionHandler']);
$app['Doctrine\Common\Persistence\ManagerRegistry'], $app['Illuminate\Contracts\Debug\ExceptionHandler']);
});
}

Expand Down
33 changes: 19 additions & 14 deletions src/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

namespace MaxBrokman\SafeQueue;

use Doctrine\Common\Persistence\ManagerRegistry;
use Doctrine\ORM\EntityManager;
use Exception;
use Illuminate\Contracts\Debug\ExceptionHandler;
Expand All @@ -16,27 +17,27 @@
/*final*/ class Worker extends IlluminateWorker
{
/**
* @var EntityManager
* @var ManagerRegistry
*/
private $entityManager;
protected $managerRegistry;

/**
* Worker constructor.
*
* @param QueueManager $manager
* @param Dispatcher $events
* @param EntityManager $entityManager
* @param ManagerRegistry $managerRegistry
* @param ExceptionHandler $exceptions
*/
public function __construct(
QueueManager $manager,
Dispatcher $events,
EntityManager $entityManager,
ManagerRegistry $managerRegistry,
ExceptionHandler $exceptions
) {
parent::__construct($manager, $events, $exceptions);

$this->entityManager = $entityManager;
$this->managerRegistry = $managerRegistry;
}

/**
Expand Down Expand Up @@ -74,19 +75,21 @@ protected function runJob($job, $connectionName, WorkerOptions $options)
*/
private function assertEntityManagerOpen()
{
if ($this->entityManager->isOpen()) {
return;
foreach ($this->managerRegistry->getManagers() as $entityManager) {
if (!$entityManager->isOpen()) {
throw new EntityManagerClosedException;
}
}

throw new EntityManagerClosedException;
}

/**
* To clear the em before doing any work.
*/
private function assertEntityManagerClear()
{
$this->entityManager->clear();
foreach ($this->managerRegistry->getManagers() as $entityManager) {
$entityManager->clear();
}
}

/**
Expand All @@ -96,11 +99,13 @@ private function assertEntityManagerClear()
*/
private function assertGoodDatabaseConnection()
{
$connection = $this->entityManager->getConnection();
foreach ($this->managerRegistry->getManagers() as $entityManager) {
$connection = $entityManager->getConnection();

if ($connection->ping() === false) {
$connection->close();
$connection->connect();
if ($connection->ping() === false) {
$connection->close();
$connection->connect();
}
}
}
}
41 changes: 40 additions & 1 deletion tests/WorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

namespace tests\MaxBrokman\SafeQueue;

use Doctrine\Common\Persistence\ManagerRegistry;
use Doctrine\DBAL\Connection;
use Doctrine\ORM\EntityManager;
use Doctrine\ORM\EntityManagerInterface;
use Illuminate\Contracts\Cache\Repository;
use Illuminate\Contracts\Debug\ExceptionHandler as Handler;
use Illuminate\Contracts\Events\Dispatcher;
Expand Down Expand Up @@ -53,6 +55,11 @@ class WorkerTest extends \PHPUnit_Framework_TestCase
*/
private $exceptions;

/**
* @var ManagerRegistry|m\MockInterface
*/
private $managerRegistry;

/**
* @var Worker
*/
Expand All @@ -72,8 +79,9 @@ protected function setUp()
$this->dbConnection = m::mock(Connection::class);
$this->cache = m::mock(Repository::class);
$this->exceptions = m::mock(Handler::class);
$this->managerRegistry = m::mock(ManagerRegistry::class);

$this->worker = new Worker($this->queueManager, $this->dispatcher, $this->entityManager, $this->exceptions);
$this->worker = new Worker($this->queueManager, $this->dispatcher, $this->managerRegistry, $this->exceptions);

$this->options = new WorkerOptions(0, 128, 0, 0, 0);

Expand Down Expand Up @@ -128,6 +136,37 @@ public function testChecksEmState()
$this->dbConnection->shouldReceive('close')->once();
$this->dbConnection->shouldReceive('connect')->once();

$this->managerRegistry->shouldReceive('getManagers')->andReturn(array($this->entityManager));

$this->worker->runNextJob('connection', 'queue', $this->options);
}

public function testMultipleEntityManagers() {
$job = m::mock(Job::class);
$job->shouldReceive('fire')->once();
$job->shouldIgnoreMissing();

$this->prepareToRunJob($job);

$this->entityManager->shouldReceive('isOpen')->once()->andReturn(true);
$this->entityManager->shouldReceive('clear')->once();

$this->dbConnection->shouldReceive('ping')->once()->andReturn(false);
$this->dbConnection->shouldReceive('close')->once();
$this->dbConnection->shouldReceive('connect')->once();

$secondConnection = m::mock(Connection::class);
$secondConnection->shouldReceive('ping')->once()->andReturn(false);
$secondConnection->shouldReceive('close')->once();
$secondConnection->shouldReceive('connect')->once();

$secondManager = m::mock(EntityManagerInterface::class);
$secondManager->shouldReceive('getConnection')->andReturn($secondConnection);
$secondManager->shouldReceive('isOpen')->once()->andReturn(true);
$secondManager->shouldReceive('clear')->once();

$this->managerRegistry->shouldReceive('getManagers')->andReturn(array($this->entityManager, $secondManager));

$this->worker->runNextJob('connection', 'queue', $this->options);
}
}

0 comments on commit d10bdcf

Please sign in to comment.