diff --git a/patches.json b/patches.json
index 3365a43..a64fce0 100644
--- a/patches.json
+++ b/patches.json
@@ -59,6 +59,15 @@
"2.2.2 - 2.2.3": "MAGECLOUD-1607__overhaul_cron_implementation__2.2.2.patch",
"2.2.4": "MAGECLOUD-1607__overhaul_cron_implementation__2.2.4.patch"
},
+ "Fix cron deadlocks and improve cron locking": {
+ "2.2.5 - 2.2.8": "MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.2.5.patch",
+ "2.2.9": "MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.2.9.patch",
+ ">=2.2.10 <2.3.0": "MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.2.10.patch",
+ "2.3.0": "MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.3.0.patch",
+ "2.3.1": "MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.3.1.patch",
+ ">=2.3.2 <2.3.3": "MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.3.2.patch",
+ ">=2.3.3 <2.3.5": "MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.3.3.patch"
+ },
"Add Zookeeper and flock locks": {
"2.2.5 - 2.2.8": "MAGECLOUD-3054__add_zookeeper_and_flock_locks__2.2.5.patch",
"2.3.0 - 2.3.1": "MAGECLOUD-3054__add_zookeeper_and_flock_locks__2.3.0.patch"
diff --git a/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.2.10.patch b/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.2.10.patch
new file mode 100644
index 0000000..4556b0b
--- /dev/null
+++ b/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.2.10.patch
@@ -0,0 +1,700 @@
+diff -Naur a/vendor/magento/module-cron/Model/DeadlockRetrier.php b/vendor/magento/module-cron/Model/DeadlockRetrier.php
+new file mode 100644
+--- /dev/null
++++ b/vendor/magento/module-cron/Model/DeadlockRetrier.php
+@@ -0,0 +1,39 @@
++getTransactionLevel() !== 0) {
++ return $callback();
++ }
++
++ for ($retries = self::MAX_RETRIES - 1; $retries > 0; $retries--) {
++ try {
++ return $callback();
++ } catch (DeadlockException $e) {
++ continue;
++ }
++ }
++
++ return $callback();
++ }
++}
+diff -Naur a/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php b/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php
+new file mode 100644
+--- /dev/null
++++ b/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php
+@@ -0,0 +1,33 @@
++timezoneConverter = $timezoneConverter ?: ObjectManager::getInstance()->get(TimezoneInterface::class);
+ $this->dateTimeFactory = $dateTimeFactory ?: ObjectManager::getInstance()->get(DateTimeFactory::class);
++ $this->retrier = $retrier ?: ObjectManager::getInstance()->get(DeadlockRetrierInterface::class);
+ }
+
+ /**
+- * @return void
++ * @inheritdoc
+ */
+ public function _construct()
+ {
+@@ -88,6 +96,8 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
++ * Set cron expression.
++ *
+ * @param string $expr
+ * @return $this
+ * @throws \Magento\Framework\Exception\CronException
+@@ -95,7 +105,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ public function setCronExpr($expr)
+ {
+ $e = preg_split('#\s+#', $expr, null, PREG_SPLIT_NO_EMPTY);
+- if (sizeof($e) < 5 || sizeof($e) > 6) {
++ if (count($e) < 5 || count($e) > 6) {
+ throw new CronException(__('Invalid cron expression: %1', $expr));
+ }
+
+@@ -104,7 +114,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
+- * Checks the observer's cron expression against time
++ * Checks the observer's cron expression against time.
+ *
+ * Supports $this->setCronExpr('* 0-5,10-59/5 2-10,15-25 january-june/2 mon-fri')
+ *
+@@ -137,6 +147,8 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
++ * Match cron expression.
++ *
+ * @param string $expr
+ * @param int $num
+ * @return bool
+@@ -164,7 +176,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ // handle modulus
+ if (strpos($expr, '/') !== false) {
+ $e = explode('/', $expr);
+- if (sizeof($e) !== 2) {
++ if (count($e) !== 2) {
+ throw new CronException(__('Invalid cron expression, expecting \'match/modulus\': %1', $expr));
+ }
+ if (!is_numeric($e[1])) {
+@@ -183,7 +195,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ } elseif (strpos($expr, '-') !== false) {
+ // handle range
+ $e = explode('-', $expr);
+- if (sizeof($e) !== 2) {
++ if (count($e) !== 2) {
+ throw new CronException(__('Invalid cron expression, expecting \'from-to\' structure: %1', $expr));
+ }
+
+@@ -203,6 +215,8 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
++ * Get number of a month.
++ *
+ * @param int|string $value
+ * @return bool|int|string
+ */
+@@ -245,21 +259,42 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
+- * Lock the cron job so no other scheduled instances run simultaneously.
++ * Sets a job to STATUS_RUNNING only if it is currently in STATUS_PENDING.
+ *
+- * Sets a job to STATUS_RUNNING only if it is currently in STATUS_PENDING
+- * and no other jobs of the same code are currently in STATUS_RUNNING.
+ * Returns true if status was changed and false otherwise.
+ *
+ * @return boolean
+ */
+ public function tryLockJob()
+ {
+- if ($this->_getResource()->trySetJobUniqueStatusAtomic(
+- $this->getId(),
+- self::STATUS_RUNNING,
+- self::STATUS_PENDING
+- )) {
++ /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
++ $scheduleResource = $this->_getResource();
++
++ // Change statuses from running to error for terminated jobs
++ $this->retrier->execute(
++ function () use ($scheduleResource) {
++ return $scheduleResource->getConnection()->update(
++ $scheduleResource->getTable('cron_schedule'),
++ ['status' => self::STATUS_ERROR],
++ ['job_code = ?' => $this->getJobCode(), 'status = ?' => self::STATUS_RUNNING]
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++
++ // Change status from pending to running for ran jobs
++ $result = $this->retrier->execute(
++ function () use ($scheduleResource) {
++ return $scheduleResource->trySetJobStatusAtomic(
++ $this->getId(),
++ self::STATUS_RUNNING,
++ self::STATUS_PENDING
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++
++ if ($result) {
+ $this->setStatus(self::STATUS_RUNNING);
+ return true;
+ }
+diff -Naur a/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php b/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
+--- a/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
++++ b/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
+@@ -3,6 +3,7 @@
+ * Copyright © Magento, Inc. All rights reserved.
+ * See COPYING.txt for license details.
+ */
++
+ /**
+ * Handling cron jobs
+ */
+@@ -14,9 +15,13 @@ use Magento\Framework\Console\Cli;
+ use Magento\Framework\Event\ObserverInterface;
+ use Magento\Framework\Profiler\Driver\Standard\Stat;
+ use Magento\Framework\Profiler\Driver\Standard\StatFactory;
++use Magento\Cron\Model\DeadlockRetrierInterface;
+
+ /**
++ * The observer for processing cron jobs.
++ *
+ * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
++ * @SuppressWarnings(PHPMD.TooManyFields)
+ */
+ class ProcessCronQueueObserver implements ObserverInterface
+ {
+@@ -60,12 +65,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ /**
+ * How long to wait for cron group to become unlocked
+ */
+- const LOCK_TIMEOUT = 5;
++ const LOCK_TIMEOUT = 60;
+
+ /**
+ * Static lock prefix for cron group locking
+ */
+- const LOCK_PREFIX = 'CRON_GROUP_';
++ const LOCK_PREFIX = 'CRON_';
++
++ /**
++ * Max retries for acquire locks for cron jobs
++ */
++ const MAX_RETRIES = 5;
+
+ /**
+ * @var \Magento\Cron\Model\ResourceModel\Schedule\Collection
+@@ -142,6 +152,16 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ private $statProfiler;
+
++ /**
++ * @var \Magento\Framework\Event\ManagerInterface
++ */
++ private $eventManager;
++
++ /**
++ * @var DeadlockRetrierInterface
++ */
++ private $retrier;
++
+ /**
+ * @param \Magento\Framework\ObjectManagerInterface $objectManager
+ * @param \Magento\Cron\Model\ScheduleFactory $scheduleFactory
+@@ -153,8 +173,11 @@ class ProcessCronQueueObserver implements ObserverInterface
+ * @param \Magento\Framework\Stdlib\DateTime\DateTime $dateTime
+ * @param \Magento\Framework\Process\PhpExecutableFinderFactory $phpExecutableFinderFactory
+ * @param \Psr\Log\LoggerInterface $logger
+- * @param \Magento\Framework\App\State $state
++ * @param State $state
+ * @param StatFactory $statFactory
++ * @param \Magento\Framework\Lock\LockManagerInterface $lockManager
++ * @param \Magento\Framework\Event\ManagerInterface $eventManager
++ * @param DeadlockRetrierInterface $retrier
+ * @SuppressWarnings(PHPMD.ExcessiveParameterList)
+ */
+ public function __construct(
+@@ -170,7 +193,9 @@ class ProcessCronQueueObserver implements ObserverInterface
+ \Psr\Log\LoggerInterface $logger,
+ \Magento\Framework\App\State $state,
+ StatFactory $statFactory,
+- \Magento\Framework\Lock\LockManagerInterface $lockManager
++ \Magento\Framework\Lock\LockManagerInterface $lockManager,
++ \Magento\Framework\Event\ManagerInterface $eventManager,
++ DeadlockRetrierInterface $retrier
+ ) {
+ $this->_objectManager = $objectManager;
+ $this->_scheduleFactory = $scheduleFactory;
+@@ -185,6 +210,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->state = $state;
+ $this->statProfiler = $statFactory->create();
+ $this->lockManager = $lockManager;
++ $this->eventManager = $eventManager;
++ $this->retrier = $retrier;
+ }
+
+ /**
+@@ -232,12 +259,12 @@ class ProcessCronQueueObserver implements ObserverInterface
+
+ $this->lockGroup(
+ $groupId,
+- function ($groupId) use ($currentTime, $jobsRoot) {
++ function ($groupId) use ($currentTime) {
+ $this->cleanupJobs($groupId, $currentTime);
+ $this->generateSchedules($groupId);
+- $this->processPendingJobs($groupId, $jobsRoot, $currentTime);
+ }
+ );
++ $this->processPendingJobs($groupId, $jobsRoot, $currentTime);
+ }
+ }
+
+@@ -306,9 +333,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ );
+ }
+
+- $schedule->setExecutedAt(strftime('%Y-%m-%d %H:%M:%S', $this->dateTime->gmtTimestamp()))->save();
++ $schedule->setExecutedAt(strftime('%Y-%m-%d %H:%M:%S', $this->dateTime->gmtTimestamp()));
++ $this->retrier->execute(
++ function () use ($schedule) {
++ $schedule->save();
++ },
++ $schedule->getResource()->getConnection()
++ );
+
+ $this->startProfiling();
++ $this->eventManager->dispatch('cron_job_run', ['job_name' => 'cron/' . $groupId . '/' . $jobCode]);
++
+ try {
+ $this->logger->info(sprintf('Cron Job %s is run', $jobCode));
+ //phpcs:ignore Magento2.Functions.DiscouragedFunction
+@@ -387,8 +422,9 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
+- * Return job collection from data base with status 'pending'
++ * Return job collection from data base with status 'pending'.
+ *
++ * @param string $groupId
+ * @return \Magento\Cron\Model\ResourceModel\Schedule\Collection
+ */
+ private function getPendingSchedules($groupId)
+@@ -400,28 +436,6 @@ class ProcessCronQueueObserver implements ObserverInterface
+ return $pendingJobs;
+ }
+
+- /**
+- * Return job collection from database with status 'pending', 'running' or 'success'
+- *
+- * @param string $groupId
+- * @return \Magento\Framework\Model\ResourceModel\Db\Collection\AbstractCollection
+- */
+- private function getNonExitedSchedules($groupId)
+- {
+- $jobs = $this->_config->getJobs();
+- $pendingJobs = $this->_scheduleFactory->create()->getCollection();
+- $pendingJobs->addFieldToFilter(
+- 'status',
+- [
+- 'in' => [
+- Schedule::STATUS_PENDING, Schedule::STATUS_RUNNING, Schedule::STATUS_SUCCESS
+- ]
+- ]
+- );
+- $pendingJobs->addFieldToFilter('job_code', ['in' => array_keys($jobs[$groupId])]);
+- return $pendingJobs;
+- }
+-
+ /**
+ * Generate cron schedule
+ *
+@@ -453,7 +467,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ null
+ );
+
+- $schedules = $this->getNonExitedSchedules($groupId);
++ $schedules = $this->getPendingSchedules($groupId);
+ $exists = [];
+ /** @var Schedule $schedule */
+ foreach ($schedules as $schedule) {
+@@ -495,8 +509,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ /**
+ * Clean expired jobs
+ *
+- * @param $groupId
+- * @param $currentTime
++ * @param string $groupId
++ * @param int $currentTime
+ * @return void
+ */
+ private function cleanupJobs($groupId, $currentTime)
+@@ -527,16 +541,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ ];
+
+ $jobs = $this->_config->getJobs()[$groupId];
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+- $connection = $scheduleResource->getConnection();
+ $count = 0;
+ foreach ($historyLifetimes as $status => $time) {
+- $count += $connection->delete(
+- $scheduleResource->getMainTable(),
++ $count += $this->cleanup(
+ [
+ 'status = ?' => $status,
+ 'job_code in (?)' => array_keys($jobs),
+- 'created_at < ?' => $connection->formatDate($currentTime - $time)
++ 'created_at < ?' => $this->_scheduleFactory
++ ->create()
++ ->getResource()
++ ->getConnection()
++ ->formatDate($currentTime - $time)
+ ]
+ );
+ }
+@@ -547,6 +562,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
++ * Get config of schedule.
++ *
+ * @param array $jobConfig
+ * @return mixed
+ */
+@@ -561,6 +578,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
++ * Save a schedule of cron job.
++ *
+ * @param string $jobCode
+ * @param string $cronExpression
+ * @param int $timeInterval
+@@ -593,6 +612,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
++ * Create a schedule of cron job.
++ *
+ * @param string $jobCode
+ * @param string $cronExpression
+ * @param int $time
+@@ -611,6 +632,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
++ * Get time interval for scheduling.
++ *
+ * @param string $groupId
+ * @return int
+ */
+@@ -623,8 +646,9 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
+- * Clean up scheduled jobs that are disabled in the configuration
+- * This can happen when you turn off a cron job in the config and flush the cache
++ * Clean up scheduled jobs that are disabled in the configuration.
++ *
++ * This can happen when you turn off a cron job in the config and flush the cache.
+ *
+ * @param string $groupId
+ * @return void
+@@ -641,9 +665,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ if (count($jobsToCleanup) > 0) {
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+- $count = $scheduleResource->getConnection()->delete(
+- $scheduleResource->getMainTable(),
++ $count = $this->cleanup(
+ [
+ 'status = ?' => Schedule::STATUS_PENDING,
+ 'job_code in (?)' => $jobsToCleanup,
+@@ -655,6 +677,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
++ * Get cron expression of cron job.
++ *
+ * @param array $jobConfig
+ * @return null|string
+ */
+@@ -674,18 +698,16 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
+- * Clean up scheduled jobs that do not match their cron expression anymore
+- * This can happen when you change the cron expression and flush the cache
++ * Clean up scheduled jobs that do not match their cron expression anymore.
++ *
++ * This can happen when you change the cron expression and flush the cache.
+ *
+ * @return $this
+ */
+ private function cleanupScheduleMismatches()
+ {
+- /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+ foreach ($this->invalid as $jobCode => $scheduledAtList) {
+- $scheduleResource->getConnection()->delete(
+- $scheduleResource->getMainTable(),
++ $this->cleanup(
+ [
+ 'status = ?' => Schedule::STATUS_PENDING,
+ 'job_code = ?' => $jobCode,
+@@ -693,13 +715,15 @@ class ProcessCronQueueObserver implements ObserverInterface
+ ]
+ );
+ }
++
+ return $this;
+ }
+
+ /**
+- * Get CronGroup Configuration Value
++ * Get CronGroup Configuration Value.
+ *
+- * @param $groupId
++ * @param string $groupId
++ * @param string $path
+ * @return int
+ */
+ private function getCronGroupConfigurationValue($groupId, $path)
+@@ -711,9 +735,9 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
+- * Is Group In Filter
++ * Is Group In Filter.
+ *
+- * @param $groupId
++ * @param string $groupId
+ * @return bool
+ */
+ private function isGroupInFilter($groupId): bool
+@@ -723,17 +747,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
+- * Process pending jobs
++ * Process pending jobs.
+ *
+- * @param $groupId
+- * @param $jobsRoot
+- * @param $currentTime
++ * @param string $groupId
++ * @param array $jobsRoot
++ * @param int $currentTime
+ */
+ private function processPendingJobs($groupId, $jobsRoot, $currentTime)
+ {
+ $procesedJobs = [];
+ $pendingJobs = $this->getPendingSchedules($groupId);
+- /** @var \Magento\Cron\Model\Schedule $schedule */
++ /** @var Schedule $schedule */
+ foreach ($pendingJobs as $schedule) {
+ if (isset($procesedJobs[$schedule->getJobCode()])) {
+ // process only on job per run
+@@ -749,26 +773,59 @@ class ProcessCronQueueObserver implements ObserverInterface
+ continue;
+ }
+
+- try {
+- if ($schedule->tryLockJob()) {
+- $this->_runJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
+- }
+- } catch (\Exception $e) {
+- $this->processError($schedule, $e);
+- }
++ $this->tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
++
+ if ($schedule->getStatus() === Schedule::STATUS_SUCCESS) {
+ $procesedJobs[$schedule->getJobCode()] = true;
+ }
+- $schedule->save();
++
++ $this->retrier->execute(
++ function () use ($schedule) {
++ $schedule->save();
++ },
++ $schedule->getResource()->getConnection()
++ );
+ }
+ }
+
+ /**
++ * Try to acquire lock for cron job and try to run this job.
++ *
++ * @param int $scheduledTime
++ * @param int $currentTime
++ * @param string[] $jobConfig
++ * @param Schedule $schedule
++ * @param string $groupId
++ */
++ private function tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId)
++ {
++ // use sha1 to limit length
++ // phpcs:ignore Magento2.Security.InsecureFunction
++ $lockName = self::LOCK_PREFIX . md5($groupId . '_' . $schedule->getJobCode());
++
++ try {
++ for ($retries = self::MAX_RETRIES; $retries > 0; $retries--) {
++ if ($this->lockManager->lock($lockName, 0) && $schedule->tryLockJob()) {
++ $this->_runJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
++ break;
++ }
++ $this->logger->warning("Could not acquire lock for cron job: {$schedule->getJobCode()}");
++ }
++ } catch (\Exception $e) {
++ $this->processError($schedule, $e);
++ } finally {
++ $this->lockManager->unlock($lockName);
++ }
++ }
++
++ /**
++ * Process error messages.
++ *
+ * @param Schedule $schedule
+ * @param \Exception $exception
+ * @return void
+ */
+- private function processError(\Magento\Cron\Model\Schedule $schedule, \Exception $exception)
++ private function processError(Schedule $schedule, \Exception $exception)
+ {
+ $schedule->setMessages($exception->getMessage());
+ if ($schedule->getStatus() === Schedule::STATUS_ERROR) {
+@@ -780,4 +837,26 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->logger->info($schedule->getMessages());
+ }
+ }
++
++ /**
++ * Clean up schedule
++ *
++ * @param mixed $where
++ * @return int
++ */
++ private function cleanup($where = ''): int
++ {
++ /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
++ $scheduleResource = $this->_scheduleFactory->create()->getResource();
++
++ return (int) $this->retrier->execute(
++ function () use ($scheduleResource, $where) {
++ return $scheduleResource->getConnection()->delete(
++ $scheduleResource->getTable('cron_schedule'),
++ $where
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++ }
+ }
+diff -Naur a/vendor/magento/module-cron/etc/di.xml b/vendor/magento/module-cron/etc/di.xml
+--- a/vendor/magento/module-cron/etc/di.xml
++++ b/vendor/magento/module-cron/etc/di.xml
+@@ -76,4 +76,5 @@
+
+
+
++
+
diff --git a/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.2.5.patch b/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.2.5.patch
new file mode 100644
index 0000000..26b455b
--- /dev/null
+++ b/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.2.5.patch
@@ -0,0 +1,794 @@
+diff -Naur a/vendor/magento/module-cron/Model/DeadlockRetrier.php b/vendor/magento/module-cron/Model/DeadlockRetrier.php
+new file mode 100644
+--- /dev/null
++++ b/vendor/magento/module-cron/Model/DeadlockRetrier.php
+@@ -0,0 +1,39 @@
++getTransactionLevel() !== 0) {
++ return $callback();
++ }
++
++ for ($retries = self::MAX_RETRIES - 1; $retries > 0; $retries--) {
++ try {
++ return $callback();
++ } catch (DeadlockException $e) {
++ continue;
++ }
++ }
++
++ return $callback();
++ }
++}
+diff -Naur a/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php b/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php
+new file mode 100644
+--- /dev/null
++++ b/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php
+@@ -0,0 +1,33 @@
++timezoneConverter = $timezoneConverter ?: ObjectManager::getInstance()->get(TimezoneInterface::class);
++ $this->dateTimeFactory = $dateTimeFactory ?: ObjectManager::getInstance()->get(DateTimeFactory::class);
++ $this->retrier = $retrier ?: ObjectManager::getInstance()->get(DeadlockRetrierInterface::class);
+ }
+
+ /**
+- * @return void
++ * @inheritdoc
+ */
+ public function _construct()
+ {
+@@ -79,6 +96,8 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
++ * Set cron expression.
++ *
+ * @param string $expr
+ * @return $this
+ * @throws \Magento\Framework\Exception\CronException
+@@ -86,7 +105,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ public function setCronExpr($expr)
+ {
+ $e = preg_split('#\s+#', $expr, null, PREG_SPLIT_NO_EMPTY);
+- if (sizeof($e) < 5 || sizeof($e) > 6) {
++ if (count($e) < 5 || count($e) > 6) {
+ throw new CronException(__('Invalid cron expression: %1', $expr));
+ }
+
+@@ -95,7 +114,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
+- * Checks the observer's cron expression against time
++ * Checks the observer's cron expression against time.
+ *
+ * Supports $this->setCronExpr('* 0-5,10-59/5 2-10,15-25 january-june/2 mon-fri')
+ *
+@@ -109,22 +128,27 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ if (!$e || !$time) {
+ return false;
+ }
++ $configTimeZone = $this->timezoneConverter->getConfigTimezone();
++ $storeDateTime = $this->dateTimeFactory->create(null, new \DateTimeZone($configTimeZone));
+ if (!is_numeric($time)) {
+ //convert time from UTC to admin store timezone
+ //we assume that all schedules in configuration (crontab.xml and DB tables) are in admin store timezone
+- $time = $this->timezoneConverter->date($time)->format('Y-m-d H:i');
+- $time = strtotime($time);
++ $dateTimeUtc = $this->dateTimeFactory->create($time);
++ $time = $dateTimeUtc->getTimestamp();
+ }
+- $match = $this->matchCronExpression($e[0], strftime('%M', $time))
+- && $this->matchCronExpression($e[1], strftime('%H', $time))
+- && $this->matchCronExpression($e[2], strftime('%d', $time))
+- && $this->matchCronExpression($e[3], strftime('%m', $time))
+- && $this->matchCronExpression($e[4], strftime('%w', $time));
++ $time = $storeDateTime->setTimestamp($time);
++ $match = $this->matchCronExpression($e[0], $time->format('i'))
++ && $this->matchCronExpression($e[1], $time->format('H'))
++ && $this->matchCronExpression($e[2], $time->format('d'))
++ && $this->matchCronExpression($e[3], $time->format('m'))
++ && $this->matchCronExpression($e[4], $time->format('w'));
+
+ return $match;
+ }
+
+ /**
++ * Match cron expression.
++ *
+ * @param string $expr
+ * @param int $num
+ * @return bool
+@@ -152,7 +176,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ // handle modulus
+ if (strpos($expr, '/') !== false) {
+ $e = explode('/', $expr);
+- if (sizeof($e) !== 2) {
++ if (count($e) !== 2) {
+ throw new CronException(__('Invalid cron expression, expecting \'match/modulus\': %1', $expr));
+ }
+ if (!is_numeric($e[1])) {
+@@ -171,7 +195,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ } elseif (strpos($expr, '-') !== false) {
+ // handle range
+ $e = explode('-', $expr);
+- if (sizeof($e) !== 2) {
++ if (count($e) !== 2) {
+ throw new CronException(__('Invalid cron expression, expecting \'from-to\' structure: %1', $expr));
+ }
+
+@@ -191,6 +215,8 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
++ * Get number of a month.
++ *
+ * @param int|string $value
+ * @return bool|int|string
+ */
+@@ -233,21 +259,42 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
+- * Lock the cron job so no other scheduled instances run simultaneously.
++ * Sets a job to STATUS_RUNNING only if it is currently in STATUS_PENDING.
+ *
+- * Sets a job to STATUS_RUNNING only if it is currently in STATUS_PENDING
+- * and no other jobs of the same code are currently in STATUS_RUNNING.
+ * Returns true if status was changed and false otherwise.
+ *
+ * @return boolean
+ */
+ public function tryLockJob()
+ {
+- if ($this->_getResource()->trySetJobUniqueStatusAtomic(
+- $this->getId(),
+- self::STATUS_RUNNING,
+- self::STATUS_PENDING
+- )) {
++ /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
++ $scheduleResource = $this->_getResource();
++
++ // Change statuses from running to error for terminated jobs
++ $this->retrier->execute(
++ function () use ($scheduleResource) {
++ return $scheduleResource->getConnection()->update(
++ $scheduleResource->getTable('cron_schedule'),
++ ['status' => self::STATUS_ERROR],
++ ['job_code = ?' => $this->getJobCode(), 'status = ?' => self::STATUS_RUNNING]
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++
++ // Change status from pending to running for ran jobs
++ $result = $this->retrier->execute(
++ function () use ($scheduleResource) {
++ return $scheduleResource->trySetJobStatusAtomic(
++ $this->getId(),
++ self::STATUS_RUNNING,
++ self::STATUS_PENDING
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++
++ if ($result) {
+ $this->setStatus(self::STATUS_RUNNING);
+ return true;
+ }
+diff -Naur a/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php b/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
+--- a/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
++++ b/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
+@@ -9,15 +9,19 @@
+ */
+ namespace Magento\Cron\Observer;
+
++use Magento\Cron\Model\Schedule;
+ use Magento\Framework\App\State;
+ use Magento\Framework\Console\Cli;
+ use Magento\Framework\Event\ObserverInterface;
+-use \Magento\Cron\Model\Schedule;
+ use Magento\Framework\Profiler\Driver\Standard\Stat;
+ use Magento\Framework\Profiler\Driver\Standard\StatFactory;
++use Magento\Cron\Model\DeadlockRetrierInterface;
+
+ /**
++ * The observer for processing cron jobs.
++ *
+ * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
++ * @SuppressWarnings(PHPMD.TooManyFields)
+ */
+ class ProcessCronQueueObserver implements ObserverInterface
+ {
+@@ -61,12 +65,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ /**
+ * How long to wait for cron group to become unlocked
+ */
+- const LOCK_TIMEOUT = 5;
++ const LOCK_TIMEOUT = 60;
+
+ /**
+ * Static lock prefix for cron group locking
+ */
+- const LOCK_PREFIX = 'CRON_GROUP_';
++ const LOCK_PREFIX = 'CRON_';
++
++ /**
++ * Max retries for acquire locks for cron jobs
++ */
++ const MAX_RETRIES = 5;
+
+ /**
+ * @var \Magento\Cron\Model\ResourceModel\Schedule\Collection
+@@ -143,6 +152,16 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ private $statProfiler;
+
++ /**
++ * @var \Magento\Framework\Event\ManagerInterface
++ */
++ private $eventManager;
++
++ /**
++ * @var DeadlockRetrierInterface
++ */
++ private $retrier;
++
+ /**
+ * @param \Magento\Framework\ObjectManagerInterface $objectManager
+ * @param \Magento\Cron\Model\ScheduleFactory $scheduleFactory
+@@ -154,8 +173,11 @@ class ProcessCronQueueObserver implements ObserverInterface
+ * @param \Magento\Framework\Stdlib\DateTime\DateTime $dateTime
+ * @param \Magento\Framework\Process\PhpExecutableFinderFactory $phpExecutableFinderFactory
+ * @param \Psr\Log\LoggerInterface $logger
+- * @param \Magento\Framework\App\State $state
++ * @param State $state
+ * @param StatFactory $statFactory
++ * @param \Magento\Framework\Lock\LockManagerInterface $lockManager
++ * @param \Magento\Framework\Event\ManagerInterface $eventManager
++ * @param DeadlockRetrierInterface $retrier
+ * @SuppressWarnings(PHPMD.ExcessiveParameterList)
+ */
+ public function __construct(
+@@ -171,7 +193,9 @@ class ProcessCronQueueObserver implements ObserverInterface
+ \Psr\Log\LoggerInterface $logger,
+ \Magento\Framework\App\State $state,
+ StatFactory $statFactory,
+- \Magento\Framework\Lock\LockManagerInterface $lockManager
++ \Magento\Framework\Lock\LockManagerInterface $lockManager,
++ \Magento\Framework\Event\ManagerInterface $eventManager,
++ DeadlockRetrierInterface $retrier
+ ) {
+ $this->_objectManager = $objectManager;
+ $this->_scheduleFactory = $scheduleFactory;
+@@ -186,6 +210,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->state = $state;
+ $this->statProfiler = $statFactory->create();
+ $this->lockManager = $lockManager;
++ $this->eventManager = $eventManager;
++ $this->retrier = $retrier;
+ }
+
+ /**
+@@ -201,7 +227,6 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ public function execute(\Magento\Framework\Event\Observer $observer)
+ {
+-
+ $currentTime = $this->dateTime->gmtTimestamp();
+ $jobGroupsRoot = $this->_config->getJobs();
+ // sort jobs groups to start from used in separated process
+@@ -234,12 +259,12 @@ class ProcessCronQueueObserver implements ObserverInterface
+
+ $this->lockGroup(
+ $groupId,
+- function ($groupId) use ($currentTime, $jobsRoot) {
++ function ($groupId) use ($currentTime) {
+ $this->cleanupJobs($groupId, $currentTime);
+ $this->generateSchedules($groupId);
+- $this->processPendingJobs($groupId, $jobsRoot, $currentTime);
+ }
+ );
++ $this->processPendingJobs($groupId, $jobsRoot, $currentTime);
+ }
+ }
+
+@@ -255,7 +280,6 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ private function lockGroup($groupId, callable $callback)
+ {
+-
+ if (!$this->lockManager->lock(self::LOCK_PREFIX . $groupId, self::LOCK_TIMEOUT)) {
+ $this->logger->warning(
+ sprintf(
+@@ -290,36 +314,50 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $scheduleLifetime = $scheduleLifetime * self::SECONDS_IN_MINUTE;
+ if ($scheduledTime < $currentTime - $scheduleLifetime) {
+ $schedule->setStatus(Schedule::STATUS_MISSED);
++ // phpcs:ignore Magento2.Exceptions.DirectThrow
+ throw new \Exception(sprintf('Cron Job %s is missed at %s', $jobCode, $schedule->getScheduledAt()));
+ }
+
+ if (!isset($jobConfig['instance'], $jobConfig['method'])) {
+ $schedule->setStatus(Schedule::STATUS_ERROR);
+- throw new \Exception('No callbacks found');
++ // phpcs:ignore Magento2.Exceptions.DirectThrow
++ throw new \Exception(sprintf('No callbacks found for cron job %s', $jobCode));
+ }
+ $model = $this->_objectManager->create($jobConfig['instance']);
+ $callback = [$model, $jobConfig['method']];
+ if (!is_callable($callback)) {
+ $schedule->setStatus(Schedule::STATUS_ERROR);
++ // phpcs:ignore Magento2.Exceptions.DirectThrow
+ throw new \Exception(
+ sprintf('Invalid callback: %s::%s can\'t be called', $jobConfig['instance'], $jobConfig['method'])
+ );
+ }
+
+- $schedule->setExecutedAt(strftime('%Y-%m-%d %H:%M:%S', $this->dateTime->gmtTimestamp()))->save();
++ $schedule->setExecutedAt(strftime('%Y-%m-%d %H:%M:%S', $this->dateTime->gmtTimestamp()));
++ $this->retrier->execute(
++ function () use ($schedule) {
++ $schedule->save();
++ },
++ $schedule->getResource()->getConnection()
++ );
+
+ $this->startProfiling();
++ $this->eventManager->dispatch('cron_job_run', ['job_name' => 'cron/' . $groupId . '/' . $jobCode]);
++
+ try {
+ $this->logger->info(sprintf('Cron Job %s is run', $jobCode));
++ //phpcs:ignore Magento2.Functions.DiscouragedFunction
+ call_user_func_array($callback, [$schedule]);
+ } catch (\Throwable $e) {
+ $schedule->setStatus(Schedule::STATUS_ERROR);
+- $this->logger->error(sprintf(
+- 'Cron Job %s has an error: %s. Statistics: %s',
+- $jobCode,
+- $e->getMessage(),
+- $this->getProfilingStat()
+- ));
++ $this->logger->error(
++ sprintf(
++ 'Cron Job %s has an error: %s. Statistics: %s',
++ $jobCode,
++ $e->getMessage(),
++ $this->getProfilingStat()
++ )
++ );
+ if (!$e instanceof \Exception) {
+ $e = new \RuntimeException(
+ 'Error when running a cron job',
+@@ -332,16 +370,22 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->stopProfiling();
+ }
+
+- $schedule->setStatus(Schedule::STATUS_SUCCESS)->setFinishedAt(strftime(
+- '%Y-%m-%d %H:%M:%S',
+- $this->dateTime->gmtTimestamp()
+- ));
++ $schedule->setStatus(
++ Schedule::STATUS_SUCCESS
++ )->setFinishedAt(
++ strftime(
++ '%Y-%m-%d %H:%M:%S',
++ $this->dateTime->gmtTimestamp()
++ )
++ );
+
+- $this->logger->info(sprintf(
+- 'Cron Job %s is successfully finished. Statistics: %s',
+- $jobCode,
+- $this->getProfilingStat()
+- ));
++ $this->logger->info(
++ sprintf(
++ 'Cron Job %s is successfully finished. Statistics: %s',
++ $jobCode,
++ $this->getProfilingStat()
++ )
++ );
+ }
+
+ /**
+@@ -378,8 +422,9 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
+- * Return job collection from data base with status 'pending'
++ * Return job collection from data base with status 'pending'.
+ *
++ * @param string $groupId
+ * @return \Magento\Cron\Model\ResourceModel\Schedule\Collection
+ */
+ private function getPendingSchedules($groupId)
+@@ -464,8 +509,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ /**
+ * Clean expired jobs
+ *
+- * @param $groupId
+- * @param $currentTime
++ * @param string $groupId
++ * @param int $currentTime
+ * @return void
+ */
+ private function cleanupJobs($groupId, $currentTime)
+@@ -496,16 +541,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ ];
+
+ $jobs = $this->_config->getJobs()[$groupId];
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+- $connection = $scheduleResource->getConnection();
+ $count = 0;
+ foreach ($historyLifetimes as $status => $time) {
+- $count += $connection->delete(
+- $scheduleResource->getMainTable(),
++ $count += $this->cleanup(
+ [
+ 'status = ?' => $status,
+ 'job_code in (?)' => array_keys($jobs),
+- 'created_at < ?' => $connection->formatDate($currentTime - $time)
++ 'created_at < ?' => $this->_scheduleFactory
++ ->create()
++ ->getResource()
++ ->getConnection()
++ ->formatDate($currentTime - $time)
+ ]
+ );
+ }
+@@ -516,6 +562,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
++ * Get config of schedule.
++ *
+ * @param array $jobConfig
+ * @return mixed
+ */
+@@ -530,6 +578,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
++ * Save a schedule of cron job.
++ *
+ * @param string $jobCode
+ * @param string $cronExpression
+ * @param int $timeInterval
+@@ -562,6 +612,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
++ * Create a schedule of cron job.
++ *
+ * @param string $jobCode
+ * @param string $cronExpression
+ * @param int $time
+@@ -580,6 +632,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
++ * Get time interval for scheduling.
++ *
+ * @param string $groupId
+ * @return int
+ */
+@@ -592,8 +646,9 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
+- * Clean up scheduled jobs that are disabled in the configuration
+- * This can happen when you turn off a cron job in the config and flush the cache
++ * Clean up scheduled jobs that are disabled in the configuration.
++ *
++ * This can happen when you turn off a cron job in the config and flush the cache.
+ *
+ * @param string $groupId
+ * @return void
+@@ -610,9 +665,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ if (count($jobsToCleanup) > 0) {
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+- $count = $scheduleResource->getConnection()->delete(
+- $scheduleResource->getMainTable(),
++ $count = $this->cleanup(
+ [
+ 'status = ?' => Schedule::STATUS_PENDING,
+ 'job_code in (?)' => $jobsToCleanup,
+@@ -624,6 +677,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
++ * Get cron expression of cron job.
++ *
+ * @param array $jobConfig
+ * @return null|string
+ */
+@@ -643,29 +698,32 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
+- * Clean up scheduled jobs that do not match their cron expression anymore
+- * This can happen when you change the cron expression and flush the cache
++ * Clean up scheduled jobs that do not match their cron expression anymore.
++ *
++ * This can happen when you change the cron expression and flush the cache.
+ *
+ * @return $this
+ */
+ private function cleanupScheduleMismatches()
+ {
+- /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+ foreach ($this->invalid as $jobCode => $scheduledAtList) {
+- $scheduleResource->getConnection()->delete($scheduleResource->getMainTable(), [
+- 'status = ?' => Schedule::STATUS_PENDING,
+- 'job_code = ?' => $jobCode,
+- 'scheduled_at in (?)' => $scheduledAtList,
+- ]);
++ $this->cleanup(
++ [
++ 'status = ?' => Schedule::STATUS_PENDING,
++ 'job_code = ?' => $jobCode,
++ 'scheduled_at in (?)' => $scheduledAtList,
++ ]
++ );
+ }
++
+ return $this;
+ }
+
+ /**
+- * Get CronGroup Configuration Value
++ * Get CronGroup Configuration Value.
+ *
+- * @param $groupId
++ * @param string $groupId
++ * @param string $path
+ * @return int
+ */
+ private function getCronGroupConfigurationValue($groupId, $path)
+@@ -677,9 +735,9 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
+- * Is Group In Filter
++ * Is Group In Filter.
+ *
+- * @param $groupId
++ * @param string $groupId
+ * @return bool
+ */
+ private function isGroupInFilter($groupId): bool
+@@ -689,17 +747,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
+- * Process pending jobs
++ * Process pending jobs.
+ *
+- * @param $groupId
+- * @param $jobsRoot
+- * @param $currentTime
++ * @param string $groupId
++ * @param array $jobsRoot
++ * @param int $currentTime
+ */
+ private function processPendingJobs($groupId, $jobsRoot, $currentTime)
+ {
+ $procesedJobs = [];
+ $pendingJobs = $this->getPendingSchedules($groupId);
+- /** @var \Magento\Cron\Model\Schedule $schedule */
++ /** @var Schedule $schedule */
+ foreach ($pendingJobs as $schedule) {
+ if (isset($procesedJobs[$schedule->getJobCode()])) {
+ // process only on job per run
+@@ -715,26 +773,59 @@ class ProcessCronQueueObserver implements ObserverInterface
+ continue;
+ }
+
+- try {
+- if ($schedule->tryLockJob()) {
+- $this->_runJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
+- }
+- } catch (\Exception $e) {
+- $this->processError($schedule, $e);
+- }
++ $this->tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
++
+ if ($schedule->getStatus() === Schedule::STATUS_SUCCESS) {
+ $procesedJobs[$schedule->getJobCode()] = true;
+ }
+- $schedule->save();
++
++ $this->retrier->execute(
++ function () use ($schedule) {
++ $schedule->save();
++ },
++ $schedule->getResource()->getConnection()
++ );
+ }
+ }
+
+ /**
++ * Try to acquire lock for cron job and try to run this job.
++ *
++ * @param int $scheduledTime
++ * @param int $currentTime
++ * @param string[] $jobConfig
++ * @param Schedule $schedule
++ * @param string $groupId
++ */
++ private function tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId)
++ {
++ // use sha1 to limit length
++ // phpcs:ignore Magento2.Security.InsecureFunction
++ $lockName = self::LOCK_PREFIX . md5($groupId . '_' . $schedule->getJobCode());
++
++ try {
++ for ($retries = self::MAX_RETRIES; $retries > 0; $retries--) {
++ if ($this->lockManager->lock($lockName, 0) && $schedule->tryLockJob()) {
++ $this->_runJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
++ break;
++ }
++ $this->logger->warning("Could not acquire lock for cron job: {$schedule->getJobCode()}");
++ }
++ } catch (\Exception $e) {
++ $this->processError($schedule, $e);
++ } finally {
++ $this->lockManager->unlock($lockName);
++ }
++ }
++
++ /**
++ * Process error messages.
++ *
+ * @param Schedule $schedule
+ * @param \Exception $exception
+ * @return void
+ */
+- private function processError(\Magento\Cron\Model\Schedule $schedule, \Exception $exception)
++ private function processError(Schedule $schedule, \Exception $exception)
+ {
+ $schedule->setMessages($exception->getMessage());
+ if ($schedule->getStatus() === Schedule::STATUS_ERROR) {
+@@ -746,4 +837,26 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->logger->info($schedule->getMessages());
+ }
+ }
++
++ /**
++ * Clean up schedule
++ *
++ * @param mixed $where
++ * @return int
++ */
++ private function cleanup($where = ''): int
++ {
++ /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
++ $scheduleResource = $this->_scheduleFactory->create()->getResource();
++
++ return (int) $this->retrier->execute(
++ function () use ($scheduleResource, $where) {
++ return $scheduleResource->getConnection()->delete(
++ $scheduleResource->getTable('cron_schedule'),
++ $where
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++ }
+ }
+diff -Naur a/vendor/magento/module-cron/etc/di.xml b/vendor/magento/module-cron/etc/di.xml
+--- a/vendor/magento/module-cron/etc/di.xml
++++ b/vendor/magento/module-cron/etc/di.xml
+@@ -76,4 +76,5 @@
+
+
+
++
+
diff --git a/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.2.9.patch b/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.2.9.patch
new file mode 100644
index 0000000..637ea42
--- /dev/null
+++ b/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.2.9.patch
@@ -0,0 +1,752 @@
+diff -Naur a/vendor/magento/module-cron/Model/DeadlockRetrier.php b/vendor/magento/module-cron/Model/DeadlockRetrier.php
+new file mode 100644
+--- /dev/null
++++ b/vendor/magento/module-cron/Model/DeadlockRetrier.php
+@@ -0,0 +1,39 @@
++getTransactionLevel() !== 0) {
++ return $callback();
++ }
++
++ for ($retries = self::MAX_RETRIES - 1; $retries > 0; $retries--) {
++ try {
++ return $callback();
++ } catch (DeadlockException $e) {
++ continue;
++ }
++ }
++
++ return $callback();
++ }
++}
+diff -Naur a/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php b/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php
+new file mode 100644
+--- /dev/null
++++ b/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php
+@@ -0,0 +1,33 @@
++timezoneConverter = $timezoneConverter ?: ObjectManager::getInstance()->get(TimezoneInterface::class);
+ $this->dateTimeFactory = $dateTimeFactory ?: ObjectManager::getInstance()->get(DateTimeFactory::class);
++ $this->retrier = $retrier ?: ObjectManager::getInstance()->get(DeadlockRetrierInterface::class);
+ }
+
+ /**
+- * @return void
++ * @inheritdoc
+ */
+ public function _construct()
+ {
+@@ -88,6 +96,8 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
++ * Set cron expression.
++ *
+ * @param string $expr
+ * @return $this
+ * @throws \Magento\Framework\Exception\CronException
+@@ -95,7 +105,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ public function setCronExpr($expr)
+ {
+ $e = preg_split('#\s+#', $expr, null, PREG_SPLIT_NO_EMPTY);
+- if (sizeof($e) < 5 || sizeof($e) > 6) {
++ if (count($e) < 5 || count($e) > 6) {
+ throw new CronException(__('Invalid cron expression: %1', $expr));
+ }
+
+@@ -104,7 +114,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
+- * Checks the observer's cron expression against time
++ * Checks the observer's cron expression against time.
+ *
+ * Supports $this->setCronExpr('* 0-5,10-59/5 2-10,15-25 january-june/2 mon-fri')
+ *
+@@ -137,6 +147,8 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
++ * Match cron expression.
++ *
+ * @param string $expr
+ * @param int $num
+ * @return bool
+@@ -164,7 +176,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ // handle modulus
+ if (strpos($expr, '/') !== false) {
+ $e = explode('/', $expr);
+- if (sizeof($e) !== 2) {
++ if (count($e) !== 2) {
+ throw new CronException(__('Invalid cron expression, expecting \'match/modulus\': %1', $expr));
+ }
+ if (!is_numeric($e[1])) {
+@@ -183,7 +195,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ } elseif (strpos($expr, '-') !== false) {
+ // handle range
+ $e = explode('-', $expr);
+- if (sizeof($e) !== 2) {
++ if (count($e) !== 2) {
+ throw new CronException(__('Invalid cron expression, expecting \'from-to\' structure: %1', $expr));
+ }
+
+@@ -203,6 +215,8 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
++ * Get number of a month.
++ *
+ * @param int|string $value
+ * @return bool|int|string
+ */
+@@ -245,21 +259,42 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
+- * Lock the cron job so no other scheduled instances run simultaneously.
++ * Sets a job to STATUS_RUNNING only if it is currently in STATUS_PENDING.
+ *
+- * Sets a job to STATUS_RUNNING only if it is currently in STATUS_PENDING
+- * and no other jobs of the same code are currently in STATUS_RUNNING.
+ * Returns true if status was changed and false otherwise.
+ *
+ * @return boolean
+ */
+ public function tryLockJob()
+ {
+- if ($this->_getResource()->trySetJobUniqueStatusAtomic(
+- $this->getId(),
+- self::STATUS_RUNNING,
+- self::STATUS_PENDING
+- )) {
++ /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
++ $scheduleResource = $this->_getResource();
++
++ // Change statuses from running to error for terminated jobs
++ $this->retrier->execute(
++ function () use ($scheduleResource) {
++ return $scheduleResource->getConnection()->update(
++ $scheduleResource->getTable('cron_schedule'),
++ ['status' => self::STATUS_ERROR],
++ ['job_code = ?' => $this->getJobCode(), 'status = ?' => self::STATUS_RUNNING]
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++
++ // Change status from pending to running for ran jobs
++ $result = $this->retrier->execute(
++ function () use ($scheduleResource) {
++ return $scheduleResource->trySetJobStatusAtomic(
++ $this->getId(),
++ self::STATUS_RUNNING,
++ self::STATUS_PENDING
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++
++ if ($result) {
+ $this->setStatus(self::STATUS_RUNNING);
+ return true;
+ }
+diff -Naur a/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php b/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
+--- a/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
++++ b/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
+@@ -9,15 +9,19 @@
+ */
+ namespace Magento\Cron\Observer;
+
++use Magento\Cron\Model\Schedule;
+ use Magento\Framework\App\State;
+ use Magento\Framework\Console\Cli;
+ use Magento\Framework\Event\ObserverInterface;
+-use \Magento\Cron\Model\Schedule;
+ use Magento\Framework\Profiler\Driver\Standard\Stat;
+ use Magento\Framework\Profiler\Driver\Standard\StatFactory;
++use Magento\Cron\Model\DeadlockRetrierInterface;
+
+ /**
++ * The observer for processing cron jobs.
++ *
+ * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
++ * @SuppressWarnings(PHPMD.TooManyFields)
+ */
+ class ProcessCronQueueObserver implements ObserverInterface
+ {
+@@ -61,12 +65,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ /**
+ * How long to wait for cron group to become unlocked
+ */
+- const LOCK_TIMEOUT = 5;
++ const LOCK_TIMEOUT = 60;
+
+ /**
+ * Static lock prefix for cron group locking
+ */
+- const LOCK_PREFIX = 'CRON_GROUP_';
++ const LOCK_PREFIX = 'CRON_';
++
++ /**
++ * Max retries for acquire locks for cron jobs
++ */
++ const MAX_RETRIES = 5;
+
+ /**
+ * @var \Magento\Cron\Model\ResourceModel\Schedule\Collection
+@@ -143,6 +152,16 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ private $statProfiler;
+
++ /**
++ * @var \Magento\Framework\Event\ManagerInterface
++ */
++ private $eventManager;
++
++ /**
++ * @var DeadlockRetrierInterface
++ */
++ private $retrier;
++
+ /**
+ * @param \Magento\Framework\ObjectManagerInterface $objectManager
+ * @param \Magento\Cron\Model\ScheduleFactory $scheduleFactory
+@@ -154,8 +173,11 @@ class ProcessCronQueueObserver implements ObserverInterface
+ * @param \Magento\Framework\Stdlib\DateTime\DateTime $dateTime
+ * @param \Magento\Framework\Process\PhpExecutableFinderFactory $phpExecutableFinderFactory
+ * @param \Psr\Log\LoggerInterface $logger
+- * @param \Magento\Framework\App\State $state
++ * @param State $state
+ * @param StatFactory $statFactory
++ * @param \Magento\Framework\Lock\LockManagerInterface $lockManager
++ * @param \Magento\Framework\Event\ManagerInterface $eventManager
++ * @param DeadlockRetrierInterface $retrier
+ * @SuppressWarnings(PHPMD.ExcessiveParameterList)
+ */
+ public function __construct(
+@@ -171,7 +193,9 @@ class ProcessCronQueueObserver implements ObserverInterface
+ \Psr\Log\LoggerInterface $logger,
+ \Magento\Framework\App\State $state,
+ StatFactory $statFactory,
+- \Magento\Framework\Lock\LockManagerInterface $lockManager
++ \Magento\Framework\Lock\LockManagerInterface $lockManager,
++ \Magento\Framework\Event\ManagerInterface $eventManager,
++ DeadlockRetrierInterface $retrier
+ ) {
+ $this->_objectManager = $objectManager;
+ $this->_scheduleFactory = $scheduleFactory;
+@@ -186,6 +210,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->state = $state;
+ $this->statProfiler = $statFactory->create();
+ $this->lockManager = $lockManager;
++ $this->eventManager = $eventManager;
++ $this->retrier = $retrier;
+ }
+
+ /**
+@@ -201,7 +227,6 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ public function execute(\Magento\Framework\Event\Observer $observer)
+ {
+-
+ $currentTime = $this->dateTime->gmtTimestamp();
+ $jobGroupsRoot = $this->_config->getJobs();
+ // sort jobs groups to start from used in separated process
+@@ -234,12 +259,12 @@ class ProcessCronQueueObserver implements ObserverInterface
+
+ $this->lockGroup(
+ $groupId,
+- function ($groupId) use ($currentTime, $jobsRoot) {
++ function ($groupId) use ($currentTime) {
+ $this->cleanupJobs($groupId, $currentTime);
+ $this->generateSchedules($groupId);
+- $this->processPendingJobs($groupId, $jobsRoot, $currentTime);
+ }
+ );
++ $this->processPendingJobs($groupId, $jobsRoot, $currentTime);
+ }
+ }
+
+@@ -255,7 +280,6 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ private function lockGroup($groupId, callable $callback)
+ {
+-
+ if (!$this->lockManager->lock(self::LOCK_PREFIX . $groupId, self::LOCK_TIMEOUT)) {
+ $this->logger->warning(
+ sprintf(
+@@ -290,36 +314,50 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $scheduleLifetime = $scheduleLifetime * self::SECONDS_IN_MINUTE;
+ if ($scheduledTime < $currentTime - $scheduleLifetime) {
+ $schedule->setStatus(Schedule::STATUS_MISSED);
++ // phpcs:ignore Magento2.Exceptions.DirectThrow
+ throw new \Exception(sprintf('Cron Job %s is missed at %s', $jobCode, $schedule->getScheduledAt()));
+ }
+
+ if (!isset($jobConfig['instance'], $jobConfig['method'])) {
+ $schedule->setStatus(Schedule::STATUS_ERROR);
+- throw new \Exception('No callbacks found');
++ // phpcs:ignore Magento2.Exceptions.DirectThrow
++ throw new \Exception(sprintf('No callbacks found for cron job %s', $jobCode));
+ }
+ $model = $this->_objectManager->create($jobConfig['instance']);
+ $callback = [$model, $jobConfig['method']];
+ if (!is_callable($callback)) {
+ $schedule->setStatus(Schedule::STATUS_ERROR);
++ // phpcs:ignore Magento2.Exceptions.DirectThrow
+ throw new \Exception(
+ sprintf('Invalid callback: %s::%s can\'t be called', $jobConfig['instance'], $jobConfig['method'])
+ );
+ }
+
+- $schedule->setExecutedAt(strftime('%Y-%m-%d %H:%M:%S', $this->dateTime->gmtTimestamp()))->save();
++ $schedule->setExecutedAt(strftime('%Y-%m-%d %H:%M:%S', $this->dateTime->gmtTimestamp()));
++ $this->retrier->execute(
++ function () use ($schedule) {
++ $schedule->save();
++ },
++ $schedule->getResource()->getConnection()
++ );
+
+ $this->startProfiling();
++ $this->eventManager->dispatch('cron_job_run', ['job_name' => 'cron/' . $groupId . '/' . $jobCode]);
++
+ try {
+ $this->logger->info(sprintf('Cron Job %s is run', $jobCode));
++ //phpcs:ignore Magento2.Functions.DiscouragedFunction
+ call_user_func_array($callback, [$schedule]);
+ } catch (\Throwable $e) {
+ $schedule->setStatus(Schedule::STATUS_ERROR);
+- $this->logger->error(sprintf(
+- 'Cron Job %s has an error: %s. Statistics: %s',
+- $jobCode,
+- $e->getMessage(),
+- $this->getProfilingStat()
+- ));
++ $this->logger->error(
++ sprintf(
++ 'Cron Job %s has an error: %s. Statistics: %s',
++ $jobCode,
++ $e->getMessage(),
++ $this->getProfilingStat()
++ )
++ );
+ if (!$e instanceof \Exception) {
+ $e = new \RuntimeException(
+ 'Error when running a cron job',
+@@ -332,16 +370,22 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->stopProfiling();
+ }
+
+- $schedule->setStatus(Schedule::STATUS_SUCCESS)->setFinishedAt(strftime(
+- '%Y-%m-%d %H:%M:%S',
+- $this->dateTime->gmtTimestamp()
+- ));
++ $schedule->setStatus(
++ Schedule::STATUS_SUCCESS
++ )->setFinishedAt(
++ strftime(
++ '%Y-%m-%d %H:%M:%S',
++ $this->dateTime->gmtTimestamp()
++ )
++ );
+
+- $this->logger->info(sprintf(
+- 'Cron Job %s is successfully finished. Statistics: %s',
+- $jobCode,
+- $this->getProfilingStat()
+- ));
++ $this->logger->info(
++ sprintf(
++ 'Cron Job %s is successfully finished. Statistics: %s',
++ $jobCode,
++ $this->getProfilingStat()
++ )
++ );
+ }
+
+ /**
+@@ -378,8 +422,9 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
+- * Return job collection from data base with status 'pending'
++ * Return job collection from data base with status 'pending'.
+ *
++ * @param string $groupId
+ * @return \Magento\Cron\Model\ResourceModel\Schedule\Collection
+ */
+ private function getPendingSchedules($groupId)
+@@ -464,8 +509,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ /**
+ * Clean expired jobs
+ *
+- * @param $groupId
+- * @param $currentTime
++ * @param string $groupId
++ * @param int $currentTime
+ * @return void
+ */
+ private function cleanupJobs($groupId, $currentTime)
+@@ -496,16 +541,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ ];
+
+ $jobs = $this->_config->getJobs()[$groupId];
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+- $connection = $scheduleResource->getConnection();
+ $count = 0;
+ foreach ($historyLifetimes as $status => $time) {
+- $count += $connection->delete(
+- $scheduleResource->getMainTable(),
++ $count += $this->cleanup(
+ [
+ 'status = ?' => $status,
+ 'job_code in (?)' => array_keys($jobs),
+- 'created_at < ?' => $connection->formatDate($currentTime - $time)
++ 'created_at < ?' => $this->_scheduleFactory
++ ->create()
++ ->getResource()
++ ->getConnection()
++ ->formatDate($currentTime - $time)
+ ]
+ );
+ }
+@@ -516,6 +562,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
++ * Get config of schedule.
++ *
+ * @param array $jobConfig
+ * @return mixed
+ */
+@@ -530,6 +578,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
++ * Save a schedule of cron job.
++ *
+ * @param string $jobCode
+ * @param string $cronExpression
+ * @param int $timeInterval
+@@ -562,6 +612,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
++ * Create a schedule of cron job.
++ *
+ * @param string $jobCode
+ * @param string $cronExpression
+ * @param int $time
+@@ -580,6 +632,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
++ * Get time interval for scheduling.
++ *
+ * @param string $groupId
+ * @return int
+ */
+@@ -592,8 +646,9 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
+- * Clean up scheduled jobs that are disabled in the configuration
+- * This can happen when you turn off a cron job in the config and flush the cache
++ * Clean up scheduled jobs that are disabled in the configuration.
++ *
++ * This can happen when you turn off a cron job in the config and flush the cache.
+ *
+ * @param string $groupId
+ * @return void
+@@ -610,9 +665,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ if (count($jobsToCleanup) > 0) {
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+- $count = $scheduleResource->getConnection()->delete(
+- $scheduleResource->getMainTable(),
++ $count = $this->cleanup(
+ [
+ 'status = ?' => Schedule::STATUS_PENDING,
+ 'job_code in (?)' => $jobsToCleanup,
+@@ -624,6 +677,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
++ * Get cron expression of cron job.
++ *
+ * @param array $jobConfig
+ * @return null|string
+ */
+@@ -643,29 +698,32 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
+- * Clean up scheduled jobs that do not match their cron expression anymore
+- * This can happen when you change the cron expression and flush the cache
++ * Clean up scheduled jobs that do not match their cron expression anymore.
++ *
++ * This can happen when you change the cron expression and flush the cache.
+ *
+ * @return $this
+ */
+ private function cleanupScheduleMismatches()
+ {
+- /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+ foreach ($this->invalid as $jobCode => $scheduledAtList) {
+- $scheduleResource->getConnection()->delete($scheduleResource->getMainTable(), [
+- 'status = ?' => Schedule::STATUS_PENDING,
+- 'job_code = ?' => $jobCode,
+- 'scheduled_at in (?)' => $scheduledAtList,
+- ]);
++ $this->cleanup(
++ [
++ 'status = ?' => Schedule::STATUS_PENDING,
++ 'job_code = ?' => $jobCode,
++ 'scheduled_at in (?)' => $scheduledAtList,
++ ]
++ );
+ }
++
+ return $this;
+ }
+
+ /**
+- * Get CronGroup Configuration Value
++ * Get CronGroup Configuration Value.
+ *
+- * @param $groupId
++ * @param string $groupId
++ * @param string $path
+ * @return int
+ */
+ private function getCronGroupConfigurationValue($groupId, $path)
+@@ -677,9 +735,9 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
+- * Is Group In Filter
++ * Is Group In Filter.
+ *
+- * @param $groupId
++ * @param string $groupId
+ * @return bool
+ */
+ private function isGroupInFilter($groupId): bool
+@@ -689,17 +747,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ /**
+- * Process pending jobs
++ * Process pending jobs.
+ *
+- * @param $groupId
+- * @param $jobsRoot
+- * @param $currentTime
++ * @param string $groupId
++ * @param array $jobsRoot
++ * @param int $currentTime
+ */
+ private function processPendingJobs($groupId, $jobsRoot, $currentTime)
+ {
+ $procesedJobs = [];
+ $pendingJobs = $this->getPendingSchedules($groupId);
+- /** @var \Magento\Cron\Model\Schedule $schedule */
++ /** @var Schedule $schedule */
+ foreach ($pendingJobs as $schedule) {
+ if (isset($procesedJobs[$schedule->getJobCode()])) {
+ // process only on job per run
+@@ -715,26 +773,59 @@ class ProcessCronQueueObserver implements ObserverInterface
+ continue;
+ }
+
+- try {
+- if ($schedule->tryLockJob()) {
+- $this->_runJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
+- }
+- } catch (\Exception $e) {
+- $this->processError($schedule, $e);
+- }
++ $this->tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
++
+ if ($schedule->getStatus() === Schedule::STATUS_SUCCESS) {
+ $procesedJobs[$schedule->getJobCode()] = true;
+ }
+- $schedule->save();
++
++ $this->retrier->execute(
++ function () use ($schedule) {
++ $schedule->save();
++ },
++ $schedule->getResource()->getConnection()
++ );
+ }
+ }
+
+ /**
++ * Try to acquire lock for cron job and try to run this job.
++ *
++ * @param int $scheduledTime
++ * @param int $currentTime
++ * @param string[] $jobConfig
++ * @param Schedule $schedule
++ * @param string $groupId
++ */
++ private function tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId)
++ {
++ // use sha1 to limit length
++ // phpcs:ignore Magento2.Security.InsecureFunction
++ $lockName = self::LOCK_PREFIX . md5($groupId . '_' . $schedule->getJobCode());
++
++ try {
++ for ($retries = self::MAX_RETRIES; $retries > 0; $retries--) {
++ if ($this->lockManager->lock($lockName, 0) && $schedule->tryLockJob()) {
++ $this->_runJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
++ break;
++ }
++ $this->logger->warning("Could not acquire lock for cron job: {$schedule->getJobCode()}");
++ }
++ } catch (\Exception $e) {
++ $this->processError($schedule, $e);
++ } finally {
++ $this->lockManager->unlock($lockName);
++ }
++ }
++
++ /**
++ * Process error messages.
++ *
+ * @param Schedule $schedule
+ * @param \Exception $exception
+ * @return void
+ */
+- private function processError(\Magento\Cron\Model\Schedule $schedule, \Exception $exception)
++ private function processError(Schedule $schedule, \Exception $exception)
+ {
+ $schedule->setMessages($exception->getMessage());
+ if ($schedule->getStatus() === Schedule::STATUS_ERROR) {
+@@ -746,4 +837,26 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->logger->info($schedule->getMessages());
+ }
+ }
++
++ /**
++ * Clean up schedule
++ *
++ * @param mixed $where
++ * @return int
++ */
++ private function cleanup($where = ''): int
++ {
++ /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
++ $scheduleResource = $this->_scheduleFactory->create()->getResource();
++
++ return (int) $this->retrier->execute(
++ function () use ($scheduleResource, $where) {
++ return $scheduleResource->getConnection()->delete(
++ $scheduleResource->getTable('cron_schedule'),
++ $where
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++ }
+ }
+diff -Naur a/vendor/magento/module-cron/etc/di.xml b/vendor/magento/module-cron/etc/di.xml
+--- a/vendor/magento/module-cron/etc/di.xml
++++ b/vendor/magento/module-cron/etc/di.xml
+@@ -76,4 +76,5 @@
+
+
+
++
+
diff --git a/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.3.0.patch b/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.3.0.patch
new file mode 100644
index 0000000..bd083e1
--- /dev/null
+++ b/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.3.0.patch
@@ -0,0 +1,623 @@
+diff -Naur a/vendor/magento/module-cron/Model/DeadlockRetrier.php b/vendor/magento/module-cron/Model/DeadlockRetrier.php
+new file mode 100644
+--- /dev/null
++++ b/vendor/magento/module-cron/Model/DeadlockRetrier.php
+@@ -0,0 +1,39 @@
++getTransactionLevel() !== 0) {
++ return $callback();
++ }
++
++ for ($retries = self::MAX_RETRIES - 1; $retries > 0; $retries--) {
++ try {
++ return $callback();
++ } catch (DeadlockException $e) {
++ continue;
++ }
++ }
++
++ return $callback();
++ }
++}
+diff -Naur a/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php b/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php
+new file mode 100644
+--- /dev/null
++++ b/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php
+@@ -0,0 +1,33 @@
++timezoneConverter = $timezoneConverter ?: ObjectManager::getInstance()->get(TimezoneInterface::class);
++ $this->dateTimeFactory = $dateTimeFactory ?: ObjectManager::getInstance()->get(DateTimeFactory::class);
++ $this->retrier = $retrier ?: ObjectManager::getInstance()->get(DeadlockRetrierInterface::class);
+ }
+
+ /**
+@@ -88,7 +105,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ public function setCronExpr($expr)
+ {
+ $e = preg_split('#\s+#', $expr, null, PREG_SPLIT_NO_EMPTY);
+- if (sizeof($e) < 5 || sizeof($e) > 6) {
++ if (count($e) < 5 || count($e) > 6) {
+ throw new CronException(__('Invalid cron expression: %1', $expr));
+ }
+
+@@ -111,17 +128,20 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ if (!$e || !$time) {
+ return false;
+ }
++ $configTimeZone = $this->timezoneConverter->getConfigTimezone();
++ $storeDateTime = $this->dateTimeFactory->create(null, new \DateTimeZone($configTimeZone));
+ if (!is_numeric($time)) {
+ //convert time from UTC to admin store timezone
+ //we assume that all schedules in configuration (crontab.xml and DB tables) are in admin store timezone
+- $time = $this->timezoneConverter->date($time)->format('Y-m-d H:i');
+- $time = strtotime($time);
++ $dateTimeUtc = $this->dateTimeFactory->create($time);
++ $time = $dateTimeUtc->getTimestamp();
+ }
+- $match = $this->matchCronExpression($e[0], strftime('%M', $time))
+- && $this->matchCronExpression($e[1], strftime('%H', $time))
+- && $this->matchCronExpression($e[2], strftime('%d', $time))
+- && $this->matchCronExpression($e[3], strftime('%m', $time))
+- && $this->matchCronExpression($e[4], strftime('%w', $time));
++ $time = $storeDateTime->setTimestamp($time);
++ $match = $this->matchCronExpression($e[0], $time->format('i'))
++ && $this->matchCronExpression($e[1], $time->format('H'))
++ && $this->matchCronExpression($e[2], $time->format('d'))
++ && $this->matchCronExpression($e[3], $time->format('m'))
++ && $this->matchCronExpression($e[4], $time->format('w'));
+
+ return $match;
+ }
+@@ -156,7 +176,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ // handle modulus
+ if (strpos($expr, '/') !== false) {
+ $e = explode('/', $expr);
+- if (sizeof($e) !== 2) {
++ if (count($e) !== 2) {
+ throw new CronException(__('Invalid cron expression, expecting \'match/modulus\': %1', $expr));
+ }
+ if (!is_numeric($e[1])) {
+@@ -175,7 +195,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ } elseif (strpos($expr, '-') !== false) {
+ // handle range
+ $e = explode('-', $expr);
+- if (sizeof($e) !== 2) {
++ if (count($e) !== 2) {
+ throw new CronException(__('Invalid cron expression, expecting \'from-to\' structure: %1', $expr));
+ }
+
+@@ -239,21 +259,42 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
+- * Lock the cron job so no other scheduled instances run simultaneously.
++ * Sets a job to STATUS_RUNNING only if it is currently in STATUS_PENDING.
+ *
+- * Sets a job to STATUS_RUNNING only if it is currently in STATUS_PENDING
+- * and no other jobs of the same code are currently in STATUS_RUNNING.
+ * Returns true if status was changed and false otherwise.
+ *
+ * @return boolean
+ */
+ public function tryLockJob()
+ {
+- if ($this->_getResource()->trySetJobUniqueStatusAtomic(
+- $this->getId(),
+- self::STATUS_RUNNING,
+- self::STATUS_PENDING
+- )) {
++ /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
++ $scheduleResource = $this->_getResource();
++
++ // Change statuses from running to error for terminated jobs
++ $this->retrier->execute(
++ function () use ($scheduleResource) {
++ return $scheduleResource->getConnection()->update(
++ $scheduleResource->getTable('cron_schedule'),
++ ['status' => self::STATUS_ERROR],
++ ['job_code = ?' => $this->getJobCode(), 'status = ?' => self::STATUS_RUNNING]
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++
++ // Change status from pending to running for ran jobs
++ $result = $this->retrier->execute(
++ function () use ($scheduleResource) {
++ return $scheduleResource->trySetJobStatusAtomic(
++ $this->getId(),
++ self::STATUS_RUNNING,
++ self::STATUS_PENDING
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++
++ if ($result) {
+ $this->setStatus(self::STATUS_RUNNING);
+ return true;
+ }
+diff -Naur a/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php b/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
+--- a/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
++++ b/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
+@@ -9,17 +9,19 @@
+ */
+ namespace Magento\Cron\Observer;
+
++use Magento\Cron\Model\Schedule;
+ use Magento\Framework\App\State;
+ use Magento\Framework\Console\Cli;
+ use Magento\Framework\Event\ObserverInterface;
+-use Magento\Cron\Model\Schedule;
+ use Magento\Framework\Profiler\Driver\Standard\Stat;
+ use Magento\Framework\Profiler\Driver\Standard\StatFactory;
++use Magento\Cron\Model\DeadlockRetrierInterface;
+
+ /**
+ * The observer for processing cron jobs.
+ *
+ * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
++ * @SuppressWarnings(PHPMD.TooManyFields)
+ */
+ class ProcessCronQueueObserver implements ObserverInterface
+ {
+@@ -63,12 +65,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ /**
+ * How long to wait for cron group to become unlocked
+ */
+- const LOCK_TIMEOUT = 5;
++ const LOCK_TIMEOUT = 60;
+
+ /**
+ * Static lock prefix for cron group locking
+ */
+- const LOCK_PREFIX = 'CRON_GROUP_';
++ const LOCK_PREFIX = 'CRON_';
++
++ /**
++ * Max retries for acquire locks for cron jobs
++ */
++ const MAX_RETRIES = 5;
+
+ /**
+ * @var \Magento\Cron\Model\ResourceModel\Schedule\Collection
+@@ -145,6 +152,16 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ private $statProfiler;
+
++ /**
++ * @var \Magento\Framework\Event\ManagerInterface
++ */
++ private $eventManager;
++
++ /**
++ * @var DeadlockRetrierInterface
++ */
++ private $retrier;
++
+ /**
+ * @param \Magento\Framework\ObjectManagerInterface $objectManager
+ * @param \Magento\Cron\Model\ScheduleFactory $scheduleFactory
+@@ -159,6 +176,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ * @param State $state
+ * @param StatFactory $statFactory
+ * @param \Magento\Framework\Lock\LockManagerInterface $lockManager
++ * @param \Magento\Framework\Event\ManagerInterface $eventManager
++ * @param DeadlockRetrierInterface $retrier
+ * @SuppressWarnings(PHPMD.ExcessiveParameterList)
+ */
+ public function __construct(
+@@ -174,7 +193,9 @@ class ProcessCronQueueObserver implements ObserverInterface
+ \Psr\Log\LoggerInterface $logger,
+ \Magento\Framework\App\State $state,
+ StatFactory $statFactory,
+- \Magento\Framework\Lock\LockManagerInterface $lockManager
++ \Magento\Framework\Lock\LockManagerInterface $lockManager,
++ \Magento\Framework\Event\ManagerInterface $eventManager,
++ DeadlockRetrierInterface $retrier
+ ) {
+ $this->_objectManager = $objectManager;
+ $this->_scheduleFactory = $scheduleFactory;
+@@ -189,6 +210,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->state = $state;
+ $this->statProfiler = $statFactory->create();
+ $this->lockManager = $lockManager;
++ $this->eventManager = $eventManager;
++ $this->retrier = $retrier;
+ }
+
+ /**
+@@ -204,7 +227,6 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ public function execute(\Magento\Framework\Event\Observer $observer)
+ {
+-
+ $currentTime = $this->dateTime->gmtTimestamp();
+ $jobGroupsRoot = $this->_config->getJobs();
+ // sort jobs groups to start from used in separated process
+@@ -237,12 +259,12 @@ class ProcessCronQueueObserver implements ObserverInterface
+
+ $this->lockGroup(
+ $groupId,
+- function ($groupId) use ($currentTime, $jobsRoot) {
++ function ($groupId) use ($currentTime) {
+ $this->cleanupJobs($groupId, $currentTime);
+ $this->generateSchedules($groupId);
+- $this->processPendingJobs($groupId, $jobsRoot, $currentTime);
+ }
+ );
++ $this->processPendingJobs($groupId, $jobsRoot, $currentTime);
+ }
+ }
+
+@@ -258,7 +280,6 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ private function lockGroup($groupId, callable $callback)
+ {
+-
+ if (!$this->lockManager->lock(self::LOCK_PREFIX . $groupId, self::LOCK_TIMEOUT)) {
+ $this->logger->warning(
+ sprintf(
+@@ -293,36 +314,50 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $scheduleLifetime = $scheduleLifetime * self::SECONDS_IN_MINUTE;
+ if ($scheduledTime < $currentTime - $scheduleLifetime) {
+ $schedule->setStatus(Schedule::STATUS_MISSED);
++ // phpcs:ignore Magento2.Exceptions.DirectThrow
+ throw new \Exception(sprintf('Cron Job %s is missed at %s', $jobCode, $schedule->getScheduledAt()));
+ }
+
+ if (!isset($jobConfig['instance'], $jobConfig['method'])) {
+ $schedule->setStatus(Schedule::STATUS_ERROR);
+- throw new \Exception('No callbacks found');
++ // phpcs:ignore Magento2.Exceptions.DirectThrow
++ throw new \Exception(sprintf('No callbacks found for cron job %s', $jobCode));
+ }
+ $model = $this->_objectManager->create($jobConfig['instance']);
+ $callback = [$model, $jobConfig['method']];
+ if (!is_callable($callback)) {
+ $schedule->setStatus(Schedule::STATUS_ERROR);
++ // phpcs:ignore Magento2.Exceptions.DirectThrow
+ throw new \Exception(
+ sprintf('Invalid callback: %s::%s can\'t be called', $jobConfig['instance'], $jobConfig['method'])
+ );
+ }
+
+- $schedule->setExecutedAt(strftime('%Y-%m-%d %H:%M:%S', $this->dateTime->gmtTimestamp()))->save();
++ $schedule->setExecutedAt(strftime('%Y-%m-%d %H:%M:%S', $this->dateTime->gmtTimestamp()));
++ $this->retrier->execute(
++ function () use ($schedule) {
++ $schedule->save();
++ },
++ $schedule->getResource()->getConnection()
++ );
+
+ $this->startProfiling();
++ $this->eventManager->dispatch('cron_job_run', ['job_name' => 'cron/' . $groupId . '/' . $jobCode]);
++
+ try {
+ $this->logger->info(sprintf('Cron Job %s is run', $jobCode));
++ //phpcs:ignore Magento2.Functions.DiscouragedFunction
+ call_user_func_array($callback, [$schedule]);
+ } catch (\Throwable $e) {
+ $schedule->setStatus(Schedule::STATUS_ERROR);
+- $this->logger->error(sprintf(
+- 'Cron Job %s has an error: %s. Statistics: %s',
+- $jobCode,
+- $e->getMessage(),
+- $this->getProfilingStat()
+- ));
++ $this->logger->error(
++ sprintf(
++ 'Cron Job %s has an error: %s. Statistics: %s',
++ $jobCode,
++ $e->getMessage(),
++ $this->getProfilingStat()
++ )
++ );
+ if (!$e instanceof \Exception) {
+ $e = new \RuntimeException(
+ 'Error when running a cron job',
+@@ -335,16 +370,22 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->stopProfiling();
+ }
+
+- $schedule->setStatus(Schedule::STATUS_SUCCESS)->setFinishedAt(strftime(
+- '%Y-%m-%d %H:%M:%S',
+- $this->dateTime->gmtTimestamp()
+- ));
++ $schedule->setStatus(
++ Schedule::STATUS_SUCCESS
++ )->setFinishedAt(
++ strftime(
++ '%Y-%m-%d %H:%M:%S',
++ $this->dateTime->gmtTimestamp()
++ )
++ );
+
+- $this->logger->info(sprintf(
+- 'Cron Job %s is successfully finished. Statistics: %s',
+- $jobCode,
+- $this->getProfilingStat()
+- ));
++ $this->logger->info(
++ sprintf(
++ 'Cron Job %s is successfully finished. Statistics: %s',
++ $jobCode,
++ $this->getProfilingStat()
++ )
++ );
+ }
+
+ /**
+@@ -500,16 +541,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ ];
+
+ $jobs = $this->_config->getJobs()[$groupId];
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+- $connection = $scheduleResource->getConnection();
+ $count = 0;
+ foreach ($historyLifetimes as $status => $time) {
+- $count += $connection->delete(
+- $scheduleResource->getMainTable(),
++ $count += $this->cleanup(
+ [
+ 'status = ?' => $status,
+ 'job_code in (?)' => array_keys($jobs),
+- 'created_at < ?' => $connection->formatDate($currentTime - $time)
++ 'created_at < ?' => $this->_scheduleFactory
++ ->create()
++ ->getResource()
++ ->getConnection()
++ ->formatDate($currentTime - $time)
+ ]
+ );
+ }
+@@ -623,9 +665,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ if (count($jobsToCleanup) > 0) {
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+- $count = $scheduleResource->getConnection()->delete(
+- $scheduleResource->getMainTable(),
++ $count = $this->cleanup(
+ [
+ 'status = ?' => Schedule::STATUS_PENDING,
+ 'job_code in (?)' => $jobsToCleanup,
+@@ -666,15 +706,16 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ private function cleanupScheduleMismatches()
+ {
+- /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+ foreach ($this->invalid as $jobCode => $scheduledAtList) {
+- $scheduleResource->getConnection()->delete($scheduleResource->getMainTable(), [
+- 'status = ?' => Schedule::STATUS_PENDING,
+- 'job_code = ?' => $jobCode,
+- 'scheduled_at in (?)' => $scheduledAtList,
+- ]);
++ $this->cleanup(
++ [
++ 'status = ?' => Schedule::STATUS_PENDING,
++ 'job_code = ?' => $jobCode,
++ 'scheduled_at in (?)' => $scheduledAtList,
++ ]
++ );
+ }
++
+ return $this;
+ }
+
+@@ -716,7 +757,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ {
+ $procesedJobs = [];
+ $pendingJobs = $this->getPendingSchedules($groupId);
+- /** @var \Magento\Cron\Model\Schedule $schedule */
++ /** @var Schedule $schedule */
+ foreach ($pendingJobs as $schedule) {
+ if (isset($procesedJobs[$schedule->getJobCode()])) {
+ // process only on job per run
+@@ -732,17 +773,48 @@ class ProcessCronQueueObserver implements ObserverInterface
+ continue;
+ }
+
+- try {
+- if ($schedule->tryLockJob()) {
+- $this->_runJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
+- }
+- } catch (\Exception $e) {
+- $this->processError($schedule, $e);
+- }
++ $this->tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
++
+ if ($schedule->getStatus() === Schedule::STATUS_SUCCESS) {
+ $procesedJobs[$schedule->getJobCode()] = true;
+ }
+- $schedule->save();
++
++ $this->retrier->execute(
++ function () use ($schedule) {
++ $schedule->save();
++ },
++ $schedule->getResource()->getConnection()
++ );
++ }
++ }
++
++ /**
++ * Try to acquire lock for cron job and try to run this job.
++ *
++ * @param int $scheduledTime
++ * @param int $currentTime
++ * @param string[] $jobConfig
++ * @param Schedule $schedule
++ * @param string $groupId
++ */
++ private function tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId)
++ {
++ // use sha1 to limit length
++ // phpcs:ignore Magento2.Security.InsecureFunction
++ $lockName = self::LOCK_PREFIX . md5($groupId . '_' . $schedule->getJobCode());
++
++ try {
++ for ($retries = self::MAX_RETRIES; $retries > 0; $retries--) {
++ if ($this->lockManager->lock($lockName, 0) && $schedule->tryLockJob()) {
++ $this->_runJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
++ break;
++ }
++ $this->logger->warning("Could not acquire lock for cron job: {$schedule->getJobCode()}");
++ }
++ } catch (\Exception $e) {
++ $this->processError($schedule, $e);
++ } finally {
++ $this->lockManager->unlock($lockName);
+ }
+ }
+
+@@ -753,7 +825,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ * @param \Exception $exception
+ * @return void
+ */
+- private function processError(\Magento\Cron\Model\Schedule $schedule, \Exception $exception)
++ private function processError(Schedule $schedule, \Exception $exception)
+ {
+ $schedule->setMessages($exception->getMessage());
+ if ($schedule->getStatus() === Schedule::STATUS_ERROR) {
+@@ -765,4 +837,26 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->logger->info($schedule->getMessages());
+ }
+ }
++
++ /**
++ * Clean up schedule
++ *
++ * @param mixed $where
++ * @return int
++ */
++ private function cleanup($where = ''): int
++ {
++ /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
++ $scheduleResource = $this->_scheduleFactory->create()->getResource();
++
++ return (int) $this->retrier->execute(
++ function () use ($scheduleResource, $where) {
++ return $scheduleResource->getConnection()->delete(
++ $scheduleResource->getTable('cron_schedule'),
++ $where
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++ }
+ }
+diff -Naur a/vendor/magento/module-cron/etc/di.xml b/vendor/magento/module-cron/etc/di.xml
+--- a/vendor/magento/module-cron/etc/di.xml
++++ b/vendor/magento/module-cron/etc/di.xml
+@@ -63,4 +63,5 @@
+
+
+
++
+
diff --git a/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.3.1.patch b/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.3.1.patch
new file mode 100644
index 0000000..a267f7e
--- /dev/null
+++ b/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.3.1.patch
@@ -0,0 +1,622 @@
+diff -Naur a/vendor/magento/module-cron/Model/DeadlockRetrier.php b/vendor/magento/module-cron/Model/DeadlockRetrier.php
+new file mode 100644
+--- /dev/null
++++ b/vendor/magento/module-cron/Model/DeadlockRetrier.php
+@@ -0,0 +1,39 @@
++getTransactionLevel() !== 0) {
++ return $callback();
++ }
++
++ for ($retries = self::MAX_RETRIES - 1; $retries > 0; $retries--) {
++ try {
++ return $callback();
++ } catch (DeadlockException $e) {
++ continue;
++ }
++ }
++
++ return $callback();
++ }
++}
+diff -Naur a/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php b/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php
+new file mode 100644
+--- /dev/null
++++ b/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php
+@@ -0,0 +1,33 @@
++timezoneConverter = $timezoneConverter ?: ObjectManager::getInstance()->get(TimezoneInterface::class);
++ $this->dateTimeFactory = $dateTimeFactory ?: ObjectManager::getInstance()->get(DateTimeFactory::class);
++ $this->retrier = $retrier ?: ObjectManager::getInstance()->get(DeadlockRetrierInterface::class);
+ }
+
+ /**
+@@ -88,7 +105,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ public function setCronExpr($expr)
+ {
+ $e = preg_split('#\s+#', $expr, null, PREG_SPLIT_NO_EMPTY);
+- if (sizeof($e) < 5 || sizeof($e) > 6) {
++ if (count($e) < 5 || count($e) > 6) {
+ throw new CronException(__('Invalid cron expression: %1', $expr));
+ }
+
+@@ -111,17 +128,20 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ if (!$e || !$time) {
+ return false;
+ }
++ $configTimeZone = $this->timezoneConverter->getConfigTimezone();
++ $storeDateTime = $this->dateTimeFactory->create(null, new \DateTimeZone($configTimeZone));
+ if (!is_numeric($time)) {
+ //convert time from UTC to admin store timezone
+ //we assume that all schedules in configuration (crontab.xml and DB tables) are in admin store timezone
+- $time = $this->timezoneConverter->date($time)->format('Y-m-d H:i');
+- $time = strtotime($time);
++ $dateTimeUtc = $this->dateTimeFactory->create($time);
++ $time = $dateTimeUtc->getTimestamp();
+ }
+- $match = $this->matchCronExpression($e[0], strftime('%M', $time))
+- && $this->matchCronExpression($e[1], strftime('%H', $time))
+- && $this->matchCronExpression($e[2], strftime('%d', $time))
+- && $this->matchCronExpression($e[3], strftime('%m', $time))
+- && $this->matchCronExpression($e[4], strftime('%w', $time));
++ $time = $storeDateTime->setTimestamp($time);
++ $match = $this->matchCronExpression($e[0], $time->format('i'))
++ && $this->matchCronExpression($e[1], $time->format('H'))
++ && $this->matchCronExpression($e[2], $time->format('d'))
++ && $this->matchCronExpression($e[3], $time->format('m'))
++ && $this->matchCronExpression($e[4], $time->format('w'));
+
+ return $match;
+ }
+@@ -156,7 +176,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ // handle modulus
+ if (strpos($expr, '/') !== false) {
+ $e = explode('/', $expr);
+- if (sizeof($e) !== 2) {
++ if (count($e) !== 2) {
+ throw new CronException(__('Invalid cron expression, expecting \'match/modulus\': %1', $expr));
+ }
+ if (!is_numeric($e[1])) {
+@@ -175,7 +195,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ } elseif (strpos($expr, '-') !== false) {
+ // handle range
+ $e = explode('-', $expr);
+- if (sizeof($e) !== 2) {
++ if (count($e) !== 2) {
+ throw new CronException(__('Invalid cron expression, expecting \'from-to\' structure: %1', $expr));
+ }
+
+@@ -239,21 +259,42 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
+- * Lock the cron job so no other scheduled instances run simultaneously.
++ * Sets a job to STATUS_RUNNING only if it is currently in STATUS_PENDING.
+ *
+- * Sets a job to STATUS_RUNNING only if it is currently in STATUS_PENDING
+- * and no other jobs of the same code are currently in STATUS_RUNNING.
+ * Returns true if status was changed and false otherwise.
+ *
+ * @return boolean
+ */
+ public function tryLockJob()
+ {
+- if ($this->_getResource()->trySetJobUniqueStatusAtomic(
+- $this->getId(),
+- self::STATUS_RUNNING,
+- self::STATUS_PENDING
+- )) {
++ /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
++ $scheduleResource = $this->_getResource();
++
++ // Change statuses from running to error for terminated jobs
++ $this->retrier->execute(
++ function () use ($scheduleResource) {
++ return $scheduleResource->getConnection()->update(
++ $scheduleResource->getTable('cron_schedule'),
++ ['status' => self::STATUS_ERROR],
++ ['job_code = ?' => $this->getJobCode(), 'status = ?' => self::STATUS_RUNNING]
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++
++ // Change status from pending to running for ran jobs
++ $result = $this->retrier->execute(
++ function () use ($scheduleResource) {
++ return $scheduleResource->trySetJobStatusAtomic(
++ $this->getId(),
++ self::STATUS_RUNNING,
++ self::STATUS_PENDING
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++
++ if ($result) {
+ $this->setStatus(self::STATUS_RUNNING);
+ return true;
+ }
+diff -Naur a/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php b/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
+--- a/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
++++ b/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
+@@ -9,17 +9,19 @@
+ */
+ namespace Magento\Cron\Observer;
+
++use Magento\Cron\Model\Schedule;
+ use Magento\Framework\App\State;
+ use Magento\Framework\Console\Cli;
+ use Magento\Framework\Event\ObserverInterface;
+-use Magento\Cron\Model\Schedule;
+ use Magento\Framework\Profiler\Driver\Standard\Stat;
+ use Magento\Framework\Profiler\Driver\Standard\StatFactory;
++use Magento\Cron\Model\DeadlockRetrierInterface;
+
+ /**
+ * The observer for processing cron jobs.
+ *
+ * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
++ * @SuppressWarnings(PHPMD.TooManyFields)
+ */
+ class ProcessCronQueueObserver implements ObserverInterface
+ {
+@@ -63,12 +65,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ /**
+ * How long to wait for cron group to become unlocked
+ */
+- const LOCK_TIMEOUT = 5;
++ const LOCK_TIMEOUT = 60;
+
+ /**
+ * Static lock prefix for cron group locking
+ */
+- const LOCK_PREFIX = 'CRON_GROUP_';
++ const LOCK_PREFIX = 'CRON_';
++
++ /**
++ * Max retries for acquire locks for cron jobs
++ */
++ const MAX_RETRIES = 5;
+
+ /**
+ * @var \Magento\Cron\Model\ResourceModel\Schedule\Collection
+@@ -145,6 +152,16 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ private $statProfiler;
+
++ /**
++ * @var \Magento\Framework\Event\ManagerInterface
++ */
++ private $eventManager;
++
++ /**
++ * @var DeadlockRetrierInterface
++ */
++ private $retrier;
++
+ /**
+ * @param \Magento\Framework\ObjectManagerInterface $objectManager
+ * @param \Magento\Cron\Model\ScheduleFactory $scheduleFactory
+@@ -159,6 +176,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ * @param State $state
+ * @param StatFactory $statFactory
+ * @param \Magento\Framework\Lock\LockManagerInterface $lockManager
++ * @param \Magento\Framework\Event\ManagerInterface $eventManager
++ * @param DeadlockRetrierInterface $retrier
+ * @SuppressWarnings(PHPMD.ExcessiveParameterList)
+ */
+ public function __construct(
+@@ -174,7 +193,9 @@ class ProcessCronQueueObserver implements ObserverInterface
+ \Psr\Log\LoggerInterface $logger,
+ \Magento\Framework\App\State $state,
+ StatFactory $statFactory,
+- \Magento\Framework\Lock\LockManagerInterface $lockManager
++ \Magento\Framework\Lock\LockManagerInterface $lockManager,
++ \Magento\Framework\Event\ManagerInterface $eventManager,
++ DeadlockRetrierInterface $retrier
+ ) {
+ $this->_objectManager = $objectManager;
+ $this->_scheduleFactory = $scheduleFactory;
+@@ -189,6 +210,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->state = $state;
+ $this->statProfiler = $statFactory->create();
+ $this->lockManager = $lockManager;
++ $this->eventManager = $eventManager;
++ $this->retrier = $retrier;
+ }
+
+ /**
+@@ -204,7 +227,6 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ public function execute(\Magento\Framework\Event\Observer $observer)
+ {
+-
+ $currentTime = $this->dateTime->gmtTimestamp();
+ $jobGroupsRoot = $this->_config->getJobs();
+ // sort jobs groups to start from used in separated process
+@@ -237,12 +259,12 @@ class ProcessCronQueueObserver implements ObserverInterface
+
+ $this->lockGroup(
+ $groupId,
+- function ($groupId) use ($currentTime, $jobsRoot) {
++ function ($groupId) use ($currentTime) {
+ $this->cleanupJobs($groupId, $currentTime);
+ $this->generateSchedules($groupId);
+- $this->processPendingJobs($groupId, $jobsRoot, $currentTime);
+ }
+ );
++ $this->processPendingJobs($groupId, $jobsRoot, $currentTime);
+ }
+ }
+
+@@ -258,7 +280,6 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ private function lockGroup($groupId, callable $callback)
+ {
+-
+ if (!$this->lockManager->lock(self::LOCK_PREFIX . $groupId, self::LOCK_TIMEOUT)) {
+ $this->logger->warning(
+ sprintf(
+@@ -293,36 +314,50 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $scheduleLifetime = $scheduleLifetime * self::SECONDS_IN_MINUTE;
+ if ($scheduledTime < $currentTime - $scheduleLifetime) {
+ $schedule->setStatus(Schedule::STATUS_MISSED);
++ // phpcs:ignore Magento2.Exceptions.DirectThrow
+ throw new \Exception(sprintf('Cron Job %s is missed at %s', $jobCode, $schedule->getScheduledAt()));
+ }
+
+ if (!isset($jobConfig['instance'], $jobConfig['method'])) {
+ $schedule->setStatus(Schedule::STATUS_ERROR);
++ // phpcs:ignore Magento2.Exceptions.DirectThrow
+ throw new \Exception(sprintf('No callbacks found for cron job %s', $jobCode));
+ }
+ $model = $this->_objectManager->create($jobConfig['instance']);
+ $callback = [$model, $jobConfig['method']];
+ if (!is_callable($callback)) {
+ $schedule->setStatus(Schedule::STATUS_ERROR);
++ // phpcs:ignore Magento2.Exceptions.DirectThrow
+ throw new \Exception(
+ sprintf('Invalid callback: %s::%s can\'t be called', $jobConfig['instance'], $jobConfig['method'])
+ );
+ }
+
+- $schedule->setExecutedAt(strftime('%Y-%m-%d %H:%M:%S', $this->dateTime->gmtTimestamp()))->save();
++ $schedule->setExecutedAt(strftime('%Y-%m-%d %H:%M:%S', $this->dateTime->gmtTimestamp()));
++ $this->retrier->execute(
++ function () use ($schedule) {
++ $schedule->save();
++ },
++ $schedule->getResource()->getConnection()
++ );
+
+ $this->startProfiling();
++ $this->eventManager->dispatch('cron_job_run', ['job_name' => 'cron/' . $groupId . '/' . $jobCode]);
++
+ try {
+ $this->logger->info(sprintf('Cron Job %s is run', $jobCode));
++ //phpcs:ignore Magento2.Functions.DiscouragedFunction
+ call_user_func_array($callback, [$schedule]);
+ } catch (\Throwable $e) {
+ $schedule->setStatus(Schedule::STATUS_ERROR);
+- $this->logger->error(sprintf(
+- 'Cron Job %s has an error: %s. Statistics: %s',
+- $jobCode,
+- $e->getMessage(),
+- $this->getProfilingStat()
+- ));
++ $this->logger->error(
++ sprintf(
++ 'Cron Job %s has an error: %s. Statistics: %s',
++ $jobCode,
++ $e->getMessage(),
++ $this->getProfilingStat()
++ )
++ );
+ if (!$e instanceof \Exception) {
+ $e = new \RuntimeException(
+ 'Error when running a cron job',
+@@ -335,16 +370,22 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->stopProfiling();
+ }
+
+- $schedule->setStatus(Schedule::STATUS_SUCCESS)->setFinishedAt(strftime(
+- '%Y-%m-%d %H:%M:%S',
+- $this->dateTime->gmtTimestamp()
+- ));
++ $schedule->setStatus(
++ Schedule::STATUS_SUCCESS
++ )->setFinishedAt(
++ strftime(
++ '%Y-%m-%d %H:%M:%S',
++ $this->dateTime->gmtTimestamp()
++ )
++ );
+
+- $this->logger->info(sprintf(
+- 'Cron Job %s is successfully finished. Statistics: %s',
+- $jobCode,
+- $this->getProfilingStat()
+- ));
++ $this->logger->info(
++ sprintf(
++ 'Cron Job %s is successfully finished. Statistics: %s',
++ $jobCode,
++ $this->getProfilingStat()
++ )
++ );
+ }
+
+ /**
+@@ -500,16 +541,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ ];
+
+ $jobs = $this->_config->getJobs()[$groupId];
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+- $connection = $scheduleResource->getConnection();
+ $count = 0;
+ foreach ($historyLifetimes as $status => $time) {
+- $count += $connection->delete(
+- $scheduleResource->getMainTable(),
++ $count += $this->cleanup(
+ [
+ 'status = ?' => $status,
+ 'job_code in (?)' => array_keys($jobs),
+- 'created_at < ?' => $connection->formatDate($currentTime - $time)
++ 'created_at < ?' => $this->_scheduleFactory
++ ->create()
++ ->getResource()
++ ->getConnection()
++ ->formatDate($currentTime - $time)
+ ]
+ );
+ }
+@@ -623,9 +665,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ if (count($jobsToCleanup) > 0) {
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+- $count = $scheduleResource->getConnection()->delete(
+- $scheduleResource->getMainTable(),
++ $count = $this->cleanup(
+ [
+ 'status = ?' => Schedule::STATUS_PENDING,
+ 'job_code in (?)' => $jobsToCleanup,
+@@ -666,15 +706,16 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ private function cleanupScheduleMismatches()
+ {
+- /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+ foreach ($this->invalid as $jobCode => $scheduledAtList) {
+- $scheduleResource->getConnection()->delete($scheduleResource->getMainTable(), [
+- 'status = ?' => Schedule::STATUS_PENDING,
+- 'job_code = ?' => $jobCode,
+- 'scheduled_at in (?)' => $scheduledAtList,
+- ]);
++ $this->cleanup(
++ [
++ 'status = ?' => Schedule::STATUS_PENDING,
++ 'job_code = ?' => $jobCode,
++ 'scheduled_at in (?)' => $scheduledAtList,
++ ]
++ );
+ }
++
+ return $this;
+ }
+
+@@ -716,7 +757,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ {
+ $procesedJobs = [];
+ $pendingJobs = $this->getPendingSchedules($groupId);
+- /** @var \Magento\Cron\Model\Schedule $schedule */
++ /** @var Schedule $schedule */
+ foreach ($pendingJobs as $schedule) {
+ if (isset($procesedJobs[$schedule->getJobCode()])) {
+ // process only on job per run
+@@ -732,17 +773,48 @@ class ProcessCronQueueObserver implements ObserverInterface
+ continue;
+ }
+
+- try {
+- if ($schedule->tryLockJob()) {
+- $this->_runJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
+- }
+- } catch (\Exception $e) {
+- $this->processError($schedule, $e);
+- }
++ $this->tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
++
+ if ($schedule->getStatus() === Schedule::STATUS_SUCCESS) {
+ $procesedJobs[$schedule->getJobCode()] = true;
+ }
+- $schedule->save();
++
++ $this->retrier->execute(
++ function () use ($schedule) {
++ $schedule->save();
++ },
++ $schedule->getResource()->getConnection()
++ );
++ }
++ }
++
++ /**
++ * Try to acquire lock for cron job and try to run this job.
++ *
++ * @param int $scheduledTime
++ * @param int $currentTime
++ * @param string[] $jobConfig
++ * @param Schedule $schedule
++ * @param string $groupId
++ */
++ private function tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId)
++ {
++ // use sha1 to limit length
++ // phpcs:ignore Magento2.Security.InsecureFunction
++ $lockName = self::LOCK_PREFIX . md5($groupId . '_' . $schedule->getJobCode());
++
++ try {
++ for ($retries = self::MAX_RETRIES; $retries > 0; $retries--) {
++ if ($this->lockManager->lock($lockName, 0) && $schedule->tryLockJob()) {
++ $this->_runJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
++ break;
++ }
++ $this->logger->warning("Could not acquire lock for cron job: {$schedule->getJobCode()}");
++ }
++ } catch (\Exception $e) {
++ $this->processError($schedule, $e);
++ } finally {
++ $this->lockManager->unlock($lockName);
+ }
+ }
+
+@@ -753,7 +825,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ * @param \Exception $exception
+ * @return void
+ */
+- private function processError(\Magento\Cron\Model\Schedule $schedule, \Exception $exception)
++ private function processError(Schedule $schedule, \Exception $exception)
+ {
+ $schedule->setMessages($exception->getMessage());
+ if ($schedule->getStatus() === Schedule::STATUS_ERROR) {
+@@ -765,4 +837,26 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->logger->info($schedule->getMessages());
+ }
+ }
++
++ /**
++ * Clean up schedule
++ *
++ * @param mixed $where
++ * @return int
++ */
++ private function cleanup($where = ''): int
++ {
++ /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
++ $scheduleResource = $this->_scheduleFactory->create()->getResource();
++
++ return (int) $this->retrier->execute(
++ function () use ($scheduleResource, $where) {
++ return $scheduleResource->getConnection()->delete(
++ $scheduleResource->getTable('cron_schedule'),
++ $where
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++ }
+ }
+diff -Naur a/vendor/magento/module-cron/etc/di.xml b/vendor/magento/module-cron/etc/di.xml
+--- a/vendor/magento/module-cron/etc/di.xml
++++ b/vendor/magento/module-cron/etc/di.xml
+@@ -76,4 +76,5 @@
+
+
+
++
+
diff --git a/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.3.2.patch b/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.3.2.patch
new file mode 100644
index 0000000..d557e39
--- /dev/null
+++ b/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.3.2.patch
@@ -0,0 +1,578 @@
+diff -Naur a/vendor/magento/module-cron/Model/DeadlockRetrier.php b/vendor/magento/module-cron/Model/DeadlockRetrier.php
+new file mode 100644
+--- /dev/null
++++ b/vendor/magento/module-cron/Model/DeadlockRetrier.php
+@@ -0,0 +1,39 @@
++getTransactionLevel() !== 0) {
++ return $callback();
++ }
++
++ for ($retries = self::MAX_RETRIES - 1; $retries > 0; $retries--) {
++ try {
++ return $callback();
++ } catch (DeadlockException $e) {
++ continue;
++ }
++ }
++
++ return $callback();
++ }
++}
+diff -Naur a/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php b/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php
+new file mode 100644
+--- /dev/null
++++ b/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php
+@@ -0,0 +1,33 @@
++timezoneConverter = $timezoneConverter ?: ObjectManager::getInstance()->get(TimezoneInterface::class);
+ $this->dateTimeFactory = $dateTimeFactory ?: ObjectManager::getInstance()->get(DateTimeFactory::class);
++ $this->retrier = $retrier ?: ObjectManager::getInstance()->get(DeadlockRetrierInterface::class);
+ }
+
+ /**
+@@ -97,7 +105,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ public function setCronExpr($expr)
+ {
+ $e = preg_split('#\s+#', $expr, null, PREG_SPLIT_NO_EMPTY);
+- if (sizeof($e) < 5 || sizeof($e) > 6) {
++ if (count($e) < 5 || count($e) > 6) {
+ throw new CronException(__('Invalid cron expression: %1', $expr));
+ }
+
+@@ -168,7 +176,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ // handle modulus
+ if (strpos($expr, '/') !== false) {
+ $e = explode('/', $expr);
+- if (sizeof($e) !== 2) {
++ if (count($e) !== 2) {
+ throw new CronException(__('Invalid cron expression, expecting \'match/modulus\': %1', $expr));
+ }
+ if (!is_numeric($e[1])) {
+@@ -187,7 +195,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ } elseif (strpos($expr, '-') !== false) {
+ // handle range
+ $e = explode('-', $expr);
+- if (sizeof($e) !== 2) {
++ if (count($e) !== 2) {
+ throw new CronException(__('Invalid cron expression, expecting \'from-to\' structure: %1', $expr));
+ }
+
+@@ -251,21 +259,42 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
+- * Lock the cron job so no other scheduled instances run simultaneously.
++ * Sets a job to STATUS_RUNNING only if it is currently in STATUS_PENDING.
+ *
+- * Sets a job to STATUS_RUNNING only if it is currently in STATUS_PENDING
+- * and no other jobs of the same code are currently in STATUS_RUNNING.
+ * Returns true if status was changed and false otherwise.
+ *
+ * @return boolean
+ */
+ public function tryLockJob()
+ {
+- if ($this->_getResource()->trySetJobUniqueStatusAtomic(
+- $this->getId(),
+- self::STATUS_RUNNING,
+- self::STATUS_PENDING
+- )) {
++ /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
++ $scheduleResource = $this->_getResource();
++
++ // Change statuses from running to error for terminated jobs
++ $this->retrier->execute(
++ function () use ($scheduleResource) {
++ return $scheduleResource->getConnection()->update(
++ $scheduleResource->getTable('cron_schedule'),
++ ['status' => self::STATUS_ERROR],
++ ['job_code = ?' => $this->getJobCode(), 'status = ?' => self::STATUS_RUNNING]
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++
++ // Change status from pending to running for ran jobs
++ $result = $this->retrier->execute(
++ function () use ($scheduleResource) {
++ return $scheduleResource->trySetJobStatusAtomic(
++ $this->getId(),
++ self::STATUS_RUNNING,
++ self::STATUS_PENDING
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++
++ if ($result) {
+ $this->setStatus(self::STATUS_RUNNING);
+ return true;
+ }
+diff -Naur a/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php b/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
+--- a/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
++++ b/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
+@@ -9,17 +9,19 @@
+ */
+ namespace Magento\Cron\Observer;
+
++use Magento\Cron\Model\Schedule;
+ use Magento\Framework\App\State;
+ use Magento\Framework\Console\Cli;
+ use Magento\Framework\Event\ObserverInterface;
+-use Magento\Cron\Model\Schedule;
+ use Magento\Framework\Profiler\Driver\Standard\Stat;
+ use Magento\Framework\Profiler\Driver\Standard\StatFactory;
++use Magento\Cron\Model\DeadlockRetrierInterface;
+
+ /**
+ * The observer for processing cron jobs.
+ *
+ * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
++ * @SuppressWarnings(PHPMD.TooManyFields)
+ */
+ class ProcessCronQueueObserver implements ObserverInterface
+ {
+@@ -63,12 +65,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ /**
+ * How long to wait for cron group to become unlocked
+ */
+- const LOCK_TIMEOUT = 5;
++ const LOCK_TIMEOUT = 60;
+
+ /**
+ * Static lock prefix for cron group locking
+ */
+- const LOCK_PREFIX = 'CRON_GROUP_';
++ const LOCK_PREFIX = 'CRON_';
++
++ /**
++ * Max retries for acquire locks for cron jobs
++ */
++ const MAX_RETRIES = 5;
+
+ /**
+ * @var \Magento\Cron\Model\ResourceModel\Schedule\Collection
+@@ -145,6 +152,16 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ private $statProfiler;
+
++ /**
++ * @var \Magento\Framework\Event\ManagerInterface
++ */
++ private $eventManager;
++
++ /**
++ * @var DeadlockRetrierInterface
++ */
++ private $retrier;
++
+ /**
+ * @param \Magento\Framework\ObjectManagerInterface $objectManager
+ * @param \Magento\Cron\Model\ScheduleFactory $scheduleFactory
+@@ -159,6 +176,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ * @param State $state
+ * @param StatFactory $statFactory
+ * @param \Magento\Framework\Lock\LockManagerInterface $lockManager
++ * @param \Magento\Framework\Event\ManagerInterface $eventManager
++ * @param DeadlockRetrierInterface $retrier
+ * @SuppressWarnings(PHPMD.ExcessiveParameterList)
+ */
+ public function __construct(
+@@ -174,7 +193,9 @@ class ProcessCronQueueObserver implements ObserverInterface
+ \Psr\Log\LoggerInterface $logger,
+ \Magento\Framework\App\State $state,
+ StatFactory $statFactory,
+- \Magento\Framework\Lock\LockManagerInterface $lockManager
++ \Magento\Framework\Lock\LockManagerInterface $lockManager,
++ \Magento\Framework\Event\ManagerInterface $eventManager,
++ DeadlockRetrierInterface $retrier
+ ) {
+ $this->_objectManager = $objectManager;
+ $this->_scheduleFactory = $scheduleFactory;
+@@ -189,6 +210,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->state = $state;
+ $this->statProfiler = $statFactory->create();
+ $this->lockManager = $lockManager;
++ $this->eventManager = $eventManager;
++ $this->retrier = $retrier;
+ }
+
+ /**
+@@ -204,7 +227,6 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ public function execute(\Magento\Framework\Event\Observer $observer)
+ {
+-
+ $currentTime = $this->dateTime->gmtTimestamp();
+ $jobGroupsRoot = $this->_config->getJobs();
+ // sort jobs groups to start from used in separated process
+@@ -237,12 +259,12 @@ class ProcessCronQueueObserver implements ObserverInterface
+
+ $this->lockGroup(
+ $groupId,
+- function ($groupId) use ($currentTime, $jobsRoot) {
++ function ($groupId) use ($currentTime) {
+ $this->cleanupJobs($groupId, $currentTime);
+ $this->generateSchedules($groupId);
+- $this->processPendingJobs($groupId, $jobsRoot, $currentTime);
+ }
+ );
++ $this->processPendingJobs($groupId, $jobsRoot, $currentTime);
+ }
+ }
+
+@@ -258,7 +280,6 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ private function lockGroup($groupId, callable $callback)
+ {
+-
+ if (!$this->lockManager->lock(self::LOCK_PREFIX . $groupId, self::LOCK_TIMEOUT)) {
+ $this->logger->warning(
+ sprintf(
+@@ -293,36 +314,50 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $scheduleLifetime = $scheduleLifetime * self::SECONDS_IN_MINUTE;
+ if ($scheduledTime < $currentTime - $scheduleLifetime) {
+ $schedule->setStatus(Schedule::STATUS_MISSED);
++ // phpcs:ignore Magento2.Exceptions.DirectThrow
+ throw new \Exception(sprintf('Cron Job %s is missed at %s', $jobCode, $schedule->getScheduledAt()));
+ }
+
+ if (!isset($jobConfig['instance'], $jobConfig['method'])) {
+ $schedule->setStatus(Schedule::STATUS_ERROR);
++ // phpcs:ignore Magento2.Exceptions.DirectThrow
+ throw new \Exception(sprintf('No callbacks found for cron job %s', $jobCode));
+ }
+ $model = $this->_objectManager->create($jobConfig['instance']);
+ $callback = [$model, $jobConfig['method']];
+ if (!is_callable($callback)) {
+ $schedule->setStatus(Schedule::STATUS_ERROR);
++ // phpcs:ignore Magento2.Exceptions.DirectThrow
+ throw new \Exception(
+ sprintf('Invalid callback: %s::%s can\'t be called', $jobConfig['instance'], $jobConfig['method'])
+ );
+ }
+
+- $schedule->setExecutedAt(strftime('%Y-%m-%d %H:%M:%S', $this->dateTime->gmtTimestamp()))->save();
++ $schedule->setExecutedAt(strftime('%Y-%m-%d %H:%M:%S', $this->dateTime->gmtTimestamp()));
++ $this->retrier->execute(
++ function () use ($schedule) {
++ $schedule->save();
++ },
++ $schedule->getResource()->getConnection()
++ );
+
+ $this->startProfiling();
++ $this->eventManager->dispatch('cron_job_run', ['job_name' => 'cron/' . $groupId . '/' . $jobCode]);
++
+ try {
+ $this->logger->info(sprintf('Cron Job %s is run', $jobCode));
++ //phpcs:ignore Magento2.Functions.DiscouragedFunction
+ call_user_func_array($callback, [$schedule]);
+ } catch (\Throwable $e) {
+ $schedule->setStatus(Schedule::STATUS_ERROR);
+- $this->logger->error(sprintf(
+- 'Cron Job %s has an error: %s. Statistics: %s',
+- $jobCode,
+- $e->getMessage(),
+- $this->getProfilingStat()
+- ));
++ $this->logger->error(
++ sprintf(
++ 'Cron Job %s has an error: %s. Statistics: %s',
++ $jobCode,
++ $e->getMessage(),
++ $this->getProfilingStat()
++ )
++ );
+ if (!$e instanceof \Exception) {
+ $e = new \RuntimeException(
+ 'Error when running a cron job',
+@@ -335,16 +370,22 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->stopProfiling();
+ }
+
+- $schedule->setStatus(Schedule::STATUS_SUCCESS)->setFinishedAt(strftime(
+- '%Y-%m-%d %H:%M:%S',
+- $this->dateTime->gmtTimestamp()
+- ));
++ $schedule->setStatus(
++ Schedule::STATUS_SUCCESS
++ )->setFinishedAt(
++ strftime(
++ '%Y-%m-%d %H:%M:%S',
++ $this->dateTime->gmtTimestamp()
++ )
++ );
+
+- $this->logger->info(sprintf(
+- 'Cron Job %s is successfully finished. Statistics: %s',
+- $jobCode,
+- $this->getProfilingStat()
+- ));
++ $this->logger->info(
++ sprintf(
++ 'Cron Job %s is successfully finished. Statistics: %s',
++ $jobCode,
++ $this->getProfilingStat()
++ )
++ );
+ }
+
+ /**
+@@ -500,16 +541,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ ];
+
+ $jobs = $this->_config->getJobs()[$groupId];
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+- $connection = $scheduleResource->getConnection();
+ $count = 0;
+ foreach ($historyLifetimes as $status => $time) {
+- $count += $connection->delete(
+- $scheduleResource->getMainTable(),
++ $count += $this->cleanup(
+ [
+ 'status = ?' => $status,
+ 'job_code in (?)' => array_keys($jobs),
+- 'created_at < ?' => $connection->formatDate($currentTime - $time)
++ 'created_at < ?' => $this->_scheduleFactory
++ ->create()
++ ->getResource()
++ ->getConnection()
++ ->formatDate($currentTime - $time)
+ ]
+ );
+ }
+@@ -623,9 +665,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ if (count($jobsToCleanup) > 0) {
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+- $count = $scheduleResource->getConnection()->delete(
+- $scheduleResource->getMainTable(),
++ $count = $this->cleanup(
+ [
+ 'status = ?' => Schedule::STATUS_PENDING,
+ 'job_code in (?)' => $jobsToCleanup,
+@@ -666,15 +706,16 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ private function cleanupScheduleMismatches()
+ {
+- /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+ foreach ($this->invalid as $jobCode => $scheduledAtList) {
+- $scheduleResource->getConnection()->delete($scheduleResource->getMainTable(), [
+- 'status = ?' => Schedule::STATUS_PENDING,
+- 'job_code = ?' => $jobCode,
+- 'scheduled_at in (?)' => $scheduledAtList,
+- ]);
++ $this->cleanup(
++ [
++ 'status = ?' => Schedule::STATUS_PENDING,
++ 'job_code = ?' => $jobCode,
++ 'scheduled_at in (?)' => $scheduledAtList,
++ ]
++ );
+ }
++
+ return $this;
+ }
+
+@@ -716,7 +757,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ {
+ $procesedJobs = [];
+ $pendingJobs = $this->getPendingSchedules($groupId);
+- /** @var \Magento\Cron\Model\Schedule $schedule */
++ /** @var Schedule $schedule */
+ foreach ($pendingJobs as $schedule) {
+ if (isset($procesedJobs[$schedule->getJobCode()])) {
+ // process only on job per run
+@@ -732,17 +773,48 @@ class ProcessCronQueueObserver implements ObserverInterface
+ continue;
+ }
+
+- try {
+- if ($schedule->tryLockJob()) {
+- $this->_runJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
+- }
+- } catch (\Exception $e) {
+- $this->processError($schedule, $e);
+- }
++ $this->tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
++
+ if ($schedule->getStatus() === Schedule::STATUS_SUCCESS) {
+ $procesedJobs[$schedule->getJobCode()] = true;
+ }
+- $schedule->save();
++
++ $this->retrier->execute(
++ function () use ($schedule) {
++ $schedule->save();
++ },
++ $schedule->getResource()->getConnection()
++ );
++ }
++ }
++
++ /**
++ * Try to acquire lock for cron job and try to run this job.
++ *
++ * @param int $scheduledTime
++ * @param int $currentTime
++ * @param string[] $jobConfig
++ * @param Schedule $schedule
++ * @param string $groupId
++ */
++ private function tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId)
++ {
++ // use sha1 to limit length
++ // phpcs:ignore Magento2.Security.InsecureFunction
++ $lockName = self::LOCK_PREFIX . md5($groupId . '_' . $schedule->getJobCode());
++
++ try {
++ for ($retries = self::MAX_RETRIES; $retries > 0; $retries--) {
++ if ($this->lockManager->lock($lockName, 0) && $schedule->tryLockJob()) {
++ $this->_runJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
++ break;
++ }
++ $this->logger->warning("Could not acquire lock for cron job: {$schedule->getJobCode()}");
++ }
++ } catch (\Exception $e) {
++ $this->processError($schedule, $e);
++ } finally {
++ $this->lockManager->unlock($lockName);
+ }
+ }
+
+@@ -753,7 +825,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ * @param \Exception $exception
+ * @return void
+ */
+- private function processError(\Magento\Cron\Model\Schedule $schedule, \Exception $exception)
++ private function processError(Schedule $schedule, \Exception $exception)
+ {
+ $schedule->setMessages($exception->getMessage());
+ if ($schedule->getStatus() === Schedule::STATUS_ERROR) {
+@@ -765,4 +837,26 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->logger->info($schedule->getMessages());
+ }
+ }
++
++ /**
++ * Clean up schedule
++ *
++ * @param mixed $where
++ * @return int
++ */
++ private function cleanup($where = ''): int
++ {
++ /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
++ $scheduleResource = $this->_scheduleFactory->create()->getResource();
++
++ return (int) $this->retrier->execute(
++ function () use ($scheduleResource, $where) {
++ return $scheduleResource->getConnection()->delete(
++ $scheduleResource->getTable('cron_schedule'),
++ $where
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++ }
+ }
+diff -Naur a/vendor/magento/module-cron/etc/di.xml b/vendor/magento/module-cron/etc/di.xml
+--- a/vendor/magento/module-cron/etc/di.xml
++++ b/vendor/magento/module-cron/etc/di.xml
+@@ -76,4 +76,5 @@
+
+
+
++
+
diff --git a/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.3.3.patch b/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.3.3.patch
new file mode 100644
index 0000000..d097926
--- /dev/null
+++ b/patches/MAGECLOUD-4530__fix_cron_deadlocks_and_improve_job_locking__2.3.3.patch
@@ -0,0 +1,527 @@
+diff -Naur a/vendor/magento/module-cron/Model/DeadlockRetrier.php b/vendor/magento/module-cron/Model/DeadlockRetrier.php
+new file mode 100644
+--- /dev/null
++++ b/vendor/magento/module-cron/Model/DeadlockRetrier.php
+@@ -0,0 +1,39 @@
++getTransactionLevel() !== 0) {
++ return $callback();
++ }
++
++ for ($retries = self::MAX_RETRIES - 1; $retries > 0; $retries--) {
++ try {
++ return $callback();
++ } catch (DeadlockException $e) {
++ continue;
++ }
++ }
++
++ return $callback();
++ }
++}
+diff -Naur a/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php b/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php
+new file mode 100644
+--- /dev/null
++++ b/vendor/magento/module-cron/Model/DeadlockRetrierInterface.php
+@@ -0,0 +1,33 @@
++timezoneConverter = $timezoneConverter ?: ObjectManager::getInstance()->get(TimezoneInterface::class);
+ $this->dateTimeFactory = $dateTimeFactory ?: ObjectManager::getInstance()->get(DateTimeFactory::class);
++ $this->retrier = $retrier ?: ObjectManager::getInstance()->get(DeadlockRetrierInterface::class);
+ }
+
+ /**
+@@ -97,7 +105,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ public function setCronExpr($expr)
+ {
+ $e = preg_split('#\s+#', $expr, null, PREG_SPLIT_NO_EMPTY);
+- if (sizeof($e) < 5 || sizeof($e) > 6) {
++ if (count($e) < 5 || count($e) > 6) {
+ throw new CronException(__('Invalid cron expression: %1', $expr));
+ }
+
+@@ -168,7 +176,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ // handle modulus
+ if (strpos($expr, '/') !== false) {
+ $e = explode('/', $expr);
+- if (sizeof($e) !== 2) {
++ if (count($e) !== 2) {
+ throw new CronException(__('Invalid cron expression, expecting \'match/modulus\': %1', $expr));
+ }
+ if (!is_numeric($e[1])) {
+@@ -187,7 +195,7 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ } elseif (strpos($expr, '-') !== false) {
+ // handle range
+ $e = explode('-', $expr);
+- if (sizeof($e) !== 2) {
++ if (count($e) !== 2) {
+ throw new CronException(__('Invalid cron expression, expecting \'from-to\' structure: %1', $expr));
+ }
+
+@@ -251,21 +259,42 @@ class Schedule extends \Magento\Framework\Model\AbstractModel
+ }
+
+ /**
+- * Lock the cron job so no other scheduled instances run simultaneously.
++ * Sets a job to STATUS_RUNNING only if it is currently in STATUS_PENDING.
+ *
+- * Sets a job to STATUS_RUNNING only if it is currently in STATUS_PENDING
+- * and no other jobs of the same code are currently in STATUS_RUNNING.
+ * Returns true if status was changed and false otherwise.
+ *
+ * @return boolean
+ */
+ public function tryLockJob()
+ {
+- if ($this->_getResource()->trySetJobUniqueStatusAtomic(
+- $this->getId(),
+- self::STATUS_RUNNING,
+- self::STATUS_PENDING
+- )) {
++ /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
++ $scheduleResource = $this->_getResource();
++
++ // Change statuses from running to error for terminated jobs
++ $this->retrier->execute(
++ function () use ($scheduleResource) {
++ return $scheduleResource->getConnection()->update(
++ $scheduleResource->getTable('cron_schedule'),
++ ['status' => self::STATUS_ERROR],
++ ['job_code = ?' => $this->getJobCode(), 'status = ?' => self::STATUS_RUNNING]
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++
++ // Change status from pending to running for ran jobs
++ $result = $this->retrier->execute(
++ function () use ($scheduleResource) {
++ return $scheduleResource->trySetJobStatusAtomic(
++ $this->getId(),
++ self::STATUS_RUNNING,
++ self::STATUS_PENDING
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++
++ if ($result) {
+ $this->setStatus(self::STATUS_RUNNING);
+ return true;
+ }
+diff -Naur a/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php b/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
+--- a/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
++++ b/vendor/magento/module-cron/Observer/ProcessCronQueueObserver.php
+@@ -3,6 +3,7 @@
+ * Copyright © Magento, Inc. All rights reserved.
+ * See COPYING.txt for license details.
+ */
++
+ /**
+ * Handling cron jobs
+ */
+@@ -14,11 +15,13 @@ use Magento\Framework\Console\Cli;
+ use Magento\Framework\Event\ObserverInterface;
+ use Magento\Framework\Profiler\Driver\Standard\Stat;
+ use Magento\Framework\Profiler\Driver\Standard\StatFactory;
++use Magento\Cron\Model\DeadlockRetrierInterface;
+
+ /**
+ * The observer for processing cron jobs.
+ *
+ * @SuppressWarnings(PHPMD.CouplingBetweenObjects)
++ * @SuppressWarnings(PHPMD.TooManyFields)
+ */
+ class ProcessCronQueueObserver implements ObserverInterface
+ {
+@@ -62,12 +65,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ /**
+ * How long to wait for cron group to become unlocked
+ */
+- const LOCK_TIMEOUT = 5;
++ const LOCK_TIMEOUT = 60;
+
+ /**
+ * Static lock prefix for cron group locking
+ */
+- const LOCK_PREFIX = 'CRON_GROUP_';
++ const LOCK_PREFIX = 'CRON_';
++
++ /**
++ * Max retries for acquire locks for cron jobs
++ */
++ const MAX_RETRIES = 5;
+
+ /**
+ * @var \Magento\Cron\Model\ResourceModel\Schedule\Collection
+@@ -144,6 +152,16 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ private $statProfiler;
+
++ /**
++ * @var \Magento\Framework\Event\ManagerInterface
++ */
++ private $eventManager;
++
++ /**
++ * @var DeadlockRetrierInterface
++ */
++ private $retrier;
++
+ /**
+ * @param \Magento\Framework\ObjectManagerInterface $objectManager
+ * @param \Magento\Cron\Model\ScheduleFactory $scheduleFactory
+@@ -158,6 +176,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ * @param State $state
+ * @param StatFactory $statFactory
+ * @param \Magento\Framework\Lock\LockManagerInterface $lockManager
++ * @param \Magento\Framework\Event\ManagerInterface $eventManager
++ * @param DeadlockRetrierInterface $retrier
+ * @SuppressWarnings(PHPMD.ExcessiveParameterList)
+ */
+ public function __construct(
+@@ -173,7 +193,9 @@ class ProcessCronQueueObserver implements ObserverInterface
+ \Psr\Log\LoggerInterface $logger,
+ \Magento\Framework\App\State $state,
+ StatFactory $statFactory,
+- \Magento\Framework\Lock\LockManagerInterface $lockManager
++ \Magento\Framework\Lock\LockManagerInterface $lockManager,
++ \Magento\Framework\Event\ManagerInterface $eventManager,
++ DeadlockRetrierInterface $retrier
+ ) {
+ $this->_objectManager = $objectManager;
+ $this->_scheduleFactory = $scheduleFactory;
+@@ -188,6 +210,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->state = $state;
+ $this->statProfiler = $statFactory->create();
+ $this->lockManager = $lockManager;
++ $this->eventManager = $eventManager;
++ $this->retrier = $retrier;
+ }
+
+ /**
+@@ -235,12 +259,12 @@ class ProcessCronQueueObserver implements ObserverInterface
+
+ $this->lockGroup(
+ $groupId,
+- function ($groupId) use ($currentTime, $jobsRoot) {
++ function ($groupId) use ($currentTime) {
+ $this->cleanupJobs($groupId, $currentTime);
+ $this->generateSchedules($groupId);
+- $this->processPendingJobs($groupId, $jobsRoot, $currentTime);
+ }
+ );
++ $this->processPendingJobs($groupId, $jobsRoot, $currentTime);
+ }
+ }
+
+@@ -309,9 +333,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ );
+ }
+
+- $schedule->setExecutedAt(strftime('%Y-%m-%d %H:%M:%S', $this->dateTime->gmtTimestamp()))->save();
++ $schedule->setExecutedAt(strftime('%Y-%m-%d %H:%M:%S', $this->dateTime->gmtTimestamp()));
++ $this->retrier->execute(
++ function () use ($schedule) {
++ $schedule->save();
++ },
++ $schedule->getResource()->getConnection()
++ );
+
+ $this->startProfiling();
++ $this->eventManager->dispatch('cron_job_run', ['job_name' => 'cron/' . $groupId . '/' . $jobCode]);
++
+ try {
+ $this->logger->info(sprintf('Cron Job %s is run', $jobCode));
+ //phpcs:ignore Magento2.Functions.DiscouragedFunction
+@@ -404,28 +436,6 @@ class ProcessCronQueueObserver implements ObserverInterface
+ return $pendingJobs;
+ }
+
+- /**
+- * Return job collection from database with status 'pending', 'running' or 'success'
+- *
+- * @param string $groupId
+- * @return \Magento\Framework\Model\ResourceModel\Db\Collection\AbstractCollection
+- */
+- private function getNonExitedSchedules($groupId)
+- {
+- $jobs = $this->_config->getJobs();
+- $pendingJobs = $this->_scheduleFactory->create()->getCollection();
+- $pendingJobs->addFieldToFilter(
+- 'status',
+- [
+- 'in' => [
+- Schedule::STATUS_PENDING, Schedule::STATUS_RUNNING, Schedule::STATUS_SUCCESS
+- ]
+- ]
+- );
+- $pendingJobs->addFieldToFilter('job_code', ['in' => array_keys($jobs[$groupId])]);
+- return $pendingJobs;
+- }
+-
+ /**
+ * Generate cron schedule
+ *
+@@ -457,7 +467,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ null
+ );
+
+- $schedules = $this->getNonExitedSchedules($groupId);
++ $schedules = $this->getPendingSchedules($groupId);
+ $exists = [];
+ /** @var Schedule $schedule */
+ foreach ($schedules as $schedule) {
+@@ -531,16 +541,17 @@ class ProcessCronQueueObserver implements ObserverInterface
+ ];
+
+ $jobs = $this->_config->getJobs()[$groupId];
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+- $connection = $scheduleResource->getConnection();
+ $count = 0;
+ foreach ($historyLifetimes as $status => $time) {
+- $count += $connection->delete(
+- $scheduleResource->getMainTable(),
++ $count += $this->cleanup(
+ [
+ 'status = ?' => $status,
+ 'job_code in (?)' => array_keys($jobs),
+- 'created_at < ?' => $connection->formatDate($currentTime - $time)
++ 'created_at < ?' => $this->_scheduleFactory
++ ->create()
++ ->getResource()
++ ->getConnection()
++ ->formatDate($currentTime - $time)
+ ]
+ );
+ }
+@@ -654,9 +665,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ }
+
+ if (count($jobsToCleanup) > 0) {
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+- $count = $scheduleResource->getConnection()->delete(
+- $scheduleResource->getMainTable(),
++ $count = $this->cleanup(
+ [
+ 'status = ?' => Schedule::STATUS_PENDING,
+ 'job_code in (?)' => $jobsToCleanup,
+@@ -697,11 +706,8 @@ class ProcessCronQueueObserver implements ObserverInterface
+ */
+ private function cleanupScheduleMismatches()
+ {
+- /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
+- $scheduleResource = $this->_scheduleFactory->create()->getResource();
+ foreach ($this->invalid as $jobCode => $scheduledAtList) {
+- $scheduleResource->getConnection()->delete(
+- $scheduleResource->getMainTable(),
++ $this->cleanup(
+ [
+ 'status = ?' => Schedule::STATUS_PENDING,
+ 'job_code = ?' => $jobCode,
+@@ -709,6 +715,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ ]
+ );
+ }
++
+ return $this;
+ }
+
+@@ -750,7 +757,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ {
+ $procesedJobs = [];
+ $pendingJobs = $this->getPendingSchedules($groupId);
+- /** @var \Magento\Cron\Model\Schedule $schedule */
++ /** @var Schedule $schedule */
+ foreach ($pendingJobs as $schedule) {
+ if (isset($procesedJobs[$schedule->getJobCode()])) {
+ // process only on job per run
+@@ -766,17 +773,48 @@ class ProcessCronQueueObserver implements ObserverInterface
+ continue;
+ }
+
+- try {
+- if ($schedule->tryLockJob()) {
+- $this->_runJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
+- }
+- } catch (\Exception $e) {
+- $this->processError($schedule, $e);
+- }
++ $this->tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
++
+ if ($schedule->getStatus() === Schedule::STATUS_SUCCESS) {
+ $procesedJobs[$schedule->getJobCode()] = true;
+ }
+- $schedule->save();
++
++ $this->retrier->execute(
++ function () use ($schedule) {
++ $schedule->save();
++ },
++ $schedule->getResource()->getConnection()
++ );
++ }
++ }
++
++ /**
++ * Try to acquire lock for cron job and try to run this job.
++ *
++ * @param int $scheduledTime
++ * @param int $currentTime
++ * @param string[] $jobConfig
++ * @param Schedule $schedule
++ * @param string $groupId
++ */
++ private function tryRunJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId)
++ {
++ // use sha1 to limit length
++ // phpcs:ignore Magento2.Security.InsecureFunction
++ $lockName = self::LOCK_PREFIX . md5($groupId . '_' . $schedule->getJobCode());
++
++ try {
++ for ($retries = self::MAX_RETRIES; $retries > 0; $retries--) {
++ if ($this->lockManager->lock($lockName, 0) && $schedule->tryLockJob()) {
++ $this->_runJob($scheduledTime, $currentTime, $jobConfig, $schedule, $groupId);
++ break;
++ }
++ $this->logger->warning("Could not acquire lock for cron job: {$schedule->getJobCode()}");
++ }
++ } catch (\Exception $e) {
++ $this->processError($schedule, $e);
++ } finally {
++ $this->lockManager->unlock($lockName);
+ }
+ }
+
+@@ -787,7 +825,7 @@ class ProcessCronQueueObserver implements ObserverInterface
+ * @param \Exception $exception
+ * @return void
+ */
+- private function processError(\Magento\Cron\Model\Schedule $schedule, \Exception $exception)
++ private function processError(Schedule $schedule, \Exception $exception)
+ {
+ $schedule->setMessages($exception->getMessage());
+ if ($schedule->getStatus() === Schedule::STATUS_ERROR) {
+@@ -799,4 +837,26 @@ class ProcessCronQueueObserver implements ObserverInterface
+ $this->logger->info($schedule->getMessages());
+ }
+ }
++
++ /**
++ * Clean up schedule
++ *
++ * @param mixed $where
++ * @return int
++ */
++ private function cleanup($where = ''): int
++ {
++ /** @var \Magento\Cron\Model\ResourceModel\Schedule $scheduleResource */
++ $scheduleResource = $this->_scheduleFactory->create()->getResource();
++
++ return (int) $this->retrier->execute(
++ function () use ($scheduleResource, $where) {
++ return $scheduleResource->getConnection()->delete(
++ $scheduleResource->getTable('cron_schedule'),
++ $where
++ );
++ },
++ $scheduleResource->getConnection()
++ );
++ }
+ }
+diff -Naur a/vendor/magento/module-cron/etc/di.xml b/vendor/magento/module-cron/etc/di.xml
+--- a/vendor/magento/module-cron/etc/di.xml
++++ b/vendor/magento/module-cron/etc/di.xml
+@@ -79,4 +79,5 @@
+
+
+
++
+