Skip to content

Added: ability to choose different entity manager #1081

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Nov 24, 2020
Merged
1 change: 1 addition & 0 deletions docs/bundle/config_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ enqueue:
queue_name: ~
job:
enabled: false
default_mapping: true
async_events:
enabled: false
extensions:
Expand Down
4 changes: 4 additions & 0 deletions docs/bundle/job_queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ enqueue:
# plus basic bundle configuration

job: true

# adds bundle's default Job entity mapping to application's entity manager.
# set it to false when using your own mapped entities for jobs.
default_mapping: true

doctrine:
# plus basic bundle configuration
Expand Down
6 changes: 6 additions & 0 deletions pkg/enqueue-bundle/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ private function getJobConfiguration(): ArrayNodeDefinition
}

return (new ArrayNodeDefinition('job'))
->children()
->booleanNode('default_mapping')
->defaultTrue()
->info('Adds bundle\'s default Job entity mapping to application\'s entity manager')
->end()
->end()
->addDefaultsIfNotSet()
->canBeEnabled()
;
Expand Down
12 changes: 12 additions & 0 deletions pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,18 @@ private function registerJobQueueDoctrineEntityMapping(ContainerBuilder $contain
return;
}

$config = $container->getExtensionConfig('enqueue');

if (!empty($config)) {
$processedConfig = $this->processConfiguration(new Configuration(false), $config);

foreach ($processedConfig as $name => $modules) {
if (isset($modules['job']) && false === $modules['job']['default_mapping']) {
return;
}
}
}

foreach ($container->getExtensionConfig('doctrine') as $config) {
// do not register mappings if dbal not configured.
if (!empty($config['dbal'])) {
Expand Down
3 changes: 0 additions & 3 deletions pkg/enqueue-bundle/Tests/Functional/App/AppKernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ public function getLogDir()
return sys_get_temp_dir().'/EnqueueBundle/cache/logs';
}

/**
* @param \Symfony\Component\Config\Loader\LoaderInterface $loader
*/
public function registerContainerConfiguration(LoaderInterface $loader)
{
$loader->load(__DIR__.'/config/config.yml');
Expand Down
11 changes: 1 addition & 10 deletions pkg/job-queue/CalculateRootJobStatusService.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,12 @@ class CalculateRootJobStatusService
*/
private $jobStorage;

/**
* @param JobStorage $jobStorage
*/
public function __construct(JobStorage $jobStorage)
{
$this->jobStorage = $jobStorage;
}

/**
* @param Job $job
*
* @return bool true if root job was stopped
*/
public function calculate(Job $job)
Expand Down Expand Up @@ -91,11 +86,7 @@ protected function calculateRootJobStatus(array $jobs)
$success++;
break;
default:
throw new \LogicException(sprintf(
'Got unsupported job status: id: "%s" status: "%s"',
$job->getId(),
$job->getStatus()
));
throw new \LogicException(sprintf('Got unsupported job status: id: "%s" status: "%s"', $job->getId(), $job->getStatus()));
}
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/job-queue/DependentJobContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ class DependentJobContext
*/
private $dependentJobs;

/**
* @param Job $job
*/
public function __construct(Job $job)
{
$this->job = $job;
Expand Down
5 changes: 0 additions & 5 deletions pkg/job-queue/DependentJobProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ class DependentJobProcessor implements Processor, TopicSubscriberInterface
*/
private $logger;

/**
* @param JobStorage $jobStorage
* @param ProducerInterface $producer
* @param LoggerInterface $logger
*/
public function __construct(JobStorage $jobStorage, ProducerInterface $producer, LoggerInterface $logger)
{
$this->jobStorage = $jobStorage;
Expand Down
10 changes: 1 addition & 9 deletions pkg/job-queue/DependentJobService.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,17 @@ public function __construct(JobStorage $jobStorage)
}

/**
* @param Job $job
*
* @return DependentJobContext
*/
public function createDependentJobContext(Job $job)
{
return new DependentJobContext($job);
}

/**
* @param DependentJobContext $context
*/
public function saveDependentJob(DependentJobContext $context)
{
if (!$context->getJob()->isRoot()) {
throw new \LogicException(sprintf(
'Only root jobs allowed but got child. jobId: "%s"',
$context->getJob()->getId()
));
throw new \LogicException(sprintf('Only root jobs allowed but got child. jobId: "%s"', $context->getJob()->getId()));
}

$this->jobStorage->saveJob($context->getJob(), function (Job $job) use ($context) {
Expand Down
9 changes: 0 additions & 9 deletions pkg/job-queue/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,6 @@ public function getCreatedAt()
* Do not call from the outside.
*
* @internal
*
* @param \DateTime $createdAt
*/
public function setCreatedAt(\DateTime $createdAt)
{
Expand All @@ -258,8 +256,6 @@ public function getStartedAt()
* Do not call from the outside.
*
* @internal
*
* @param \DateTime $startedAt
*/
public function setStartedAt(\DateTime $startedAt)
{
Expand All @@ -279,8 +275,6 @@ public function getStoppedAt()
* Do not call from the outside.
*
* @internal
*
* @param \DateTime $stoppedAt
*/
public function setStoppedAt(\DateTime $stoppedAt)
{
Expand Down Expand Up @@ -324,9 +318,6 @@ public function getData()
return $this->data;
}

/**
* @param array $data
*/
public function setData(array $data)
{
$this->data = $data;
Expand Down
46 changes: 4 additions & 42 deletions pkg/job-queue/JobProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ class JobProcessor
*/
private $producer;

/**
* @param JobStorage $jobStorage
* @param ProducerInterface $producer
*/
public function __construct(JobStorage $jobStorage, ProducerInterface $producer)
{
$this->jobStorage = $jobStorage;
Expand Down Expand Up @@ -74,7 +70,6 @@ public function findOrCreateRootJob($ownerId, $jobName, $unique = false)

/**
* @param string $jobName
* @param Job $rootJob
*
* @return Job
*/
Expand Down Expand Up @@ -104,9 +99,6 @@ public function findOrCreateChildJob($jobName, Job $rootJob)
return $job;
}

/**
* @param Job $job
*/
public function startChildJob(Job $job)
{
if ($job->isRoot()) {
Expand All @@ -116,11 +108,7 @@ public function startChildJob(Job $job)
$job = $this->jobStorage->findJobById($job->getId());

if (Job::STATUS_NEW !== $job->getStatus()) {
throw new \LogicException(sprintf(
'Can start only new jobs: id: "%s", status: "%s"',
$job->getId(),
$job->getStatus()
));
throw new \LogicException(sprintf('Can start only new jobs: id: "%s", status: "%s"', $job->getId(), $job->getStatus()));
}

$job->setStatus(Job::STATUS_RUNNING);
Expand All @@ -131,9 +119,6 @@ public function startChildJob(Job $job)
$this->sendCalculateRootJobStatusEvent($job);
}

/**
* @param Job $job
*/
public function successChildJob(Job $job)
{
if ($job->isRoot()) {
Expand All @@ -143,11 +128,7 @@ public function successChildJob(Job $job)
$job = $this->jobStorage->findJobById($job->getId());

if (Job::STATUS_RUNNING !== $job->getStatus()) {
throw new \LogicException(sprintf(
'Can success only running jobs. id: "%s", status: "%s"',
$job->getId(),
$job->getStatus()
));
throw new \LogicException(sprintf('Can success only running jobs. id: "%s", status: "%s"', $job->getId(), $job->getStatus()));
}

$job->setStatus(Job::STATUS_SUCCESS);
Expand All @@ -158,9 +139,6 @@ public function successChildJob(Job $job)
$this->sendCalculateRootJobStatusEvent($job);
}

/**
* @param Job $job
*/
public function failChildJob(Job $job)
{
if ($job->isRoot()) {
Expand All @@ -170,11 +148,7 @@ public function failChildJob(Job $job)
$job = $this->jobStorage->findJobById($job->getId());

if (Job::STATUS_RUNNING !== $job->getStatus()) {
throw new \LogicException(sprintf(
'Can fail only running jobs. id: "%s", status: "%s"',
$job->getId(),
$job->getStatus()
));
throw new \LogicException(sprintf('Can fail only running jobs. id: "%s", status: "%s"', $job->getId(), $job->getStatus()));
}

$job->setStatus(Job::STATUS_FAILED);
Expand All @@ -185,9 +159,6 @@ public function failChildJob(Job $job)
$this->sendCalculateRootJobStatusEvent($job);
}

/**
* @param Job $job
*/
public function cancelChildJob(Job $job)
{
if ($job->isRoot()) {
Expand All @@ -197,11 +168,7 @@ public function cancelChildJob(Job $job)
$job = $this->jobStorage->findJobById($job->getId());

if (!in_array($job->getStatus(), [Job::STATUS_NEW, Job::STATUS_RUNNING], true)) {
throw new \LogicException(sprintf(
'Can cancel only new or running jobs. id: "%s", status: "%s"',
$job->getId(),
$job->getStatus()
));
throw new \LogicException(sprintf('Can cancel only new or running jobs. id: "%s", status: "%s"', $job->getId(), $job->getStatus()));
}

$job->setStatus(Job::STATUS_CANCELLED);
Expand All @@ -217,7 +184,6 @@ public function cancelChildJob(Job $job)
}

/**
* @param Job $job
* @param bool $force
*/
public function interruptRootJob(Job $job, $force = false)
Expand Down Expand Up @@ -245,8 +211,6 @@ public function interruptRootJob(Job $job, $force = false)

/**
* @see https://github.com/php-enqueue/enqueue-dev/pull/222#issuecomment-336102749 See for rationale
*
* @param Job $job
*/
protected function saveJob(Job $job)
{
Expand All @@ -255,8 +219,6 @@ protected function saveJob(Job $job)

/**
* @see https://github.com/php-enqueue/enqueue-dev/pull/222#issuecomment-336102749 See for rationale
*
* @param Job $job
*/
protected function sendCalculateRootJobStatusEvent(Job $job)
{
Expand Down
20 changes: 6 additions & 14 deletions pkg/job-queue/JobRunner.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ class JobRunner
private $rootJob;

/**
* @param JobProcessor $jobProcessor
* @param Job $rootJob
* @param Job $rootJob
*/
public function __construct(JobProcessor $jobProcessor, Job $rootJob = null)
{
Expand All @@ -25,9 +24,8 @@ public function __construct(JobProcessor $jobProcessor, Job $rootJob = null)
}

/**
* @param string $ownerId
* @param string $name
* @param callable $runCallback
* @param string $ownerId
* @param string $name
*
* @throws \Throwable|\Exception if $runCallback triggers an exception
*
Expand All @@ -54,11 +52,7 @@ public function runUnique($ownerId, $name, callable $runCallback)
try {
$this->jobProcessor->failChildJob($childJob);
} catch (\Throwable $t) {
throw new OrphanJobException(sprintf(
'Job cleanup failed. ID: "%s" Name: "%s"',
$childJob->getId(),
$childJob->getName()
), 0, $e);
throw new OrphanJobException(sprintf('Job cleanup failed. ID: "%s" Name: "%s"', $childJob->getId(), $childJob->getName()), 0, $e);
}

throw $e;
Expand All @@ -74,8 +68,7 @@ public function runUnique($ownerId, $name, callable $runCallback)
}

/**
* @param string $name
* @param callable $startCallback
* @param string $name
*
* @return mixed
*/
Expand All @@ -89,8 +82,7 @@ public function createDelayed($name, callable $startCallback)
}

/**
* @param string $jobId
* @param callable $runCallback
* @param string $jobId
*
* @return mixed
*/
Expand Down
3 changes: 0 additions & 3 deletions pkg/job-queue/Test/DbalPersistedConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ protected function persistTransactionNestingLevel($level)
static::$persistedTransactionNestingLevels[$this->getConnectionId()] = $level;
}

/**
* @param DriverConnection $connection
*/
protected function persistConnection(DriverConnection $connection)
{
static::$persistedConnections[$this->getConnectionId()] = $connection;
Expand Down
3 changes: 0 additions & 3 deletions pkg/job-queue/Tests/Functional/app/AppKernel.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ public function getLogDir()
return sys_get_temp_dir().'/EnqueueJobQueue/cache/logs';
}

/**
* @param \Symfony\Component\Config\Loader\LoaderInterface $loader
*/
public function registerContainerConfiguration(LoaderInterface $loader)
{
$loader->load(__DIR__.'/config/config.yml');
Expand Down