Skip to content

Commit

Permalink
Apply to transactions only on default connections
Browse files Browse the repository at this point in the history
  • Loading branch information
therezor committed Feb 27, 2018
1 parent 5e515d8 commit d5a4a71
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 32 deletions.
25 changes: 21 additions & 4 deletions src/BusServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@

namespace TheRezor\TransactionalJobs;

use Illuminate\Foundation\Application;
use Illuminate\Support\ServiceProvider;
use Illuminate\Contracts\Bus\Dispatcher as DispatcherContract;
use Illuminate\Contracts\Queue\Factory as QueueFactoryContract;
use Illuminate\Contracts\Bus\QueueingDispatcher as QueueingDispatcherContract;

class BusServiceProvider extends ServiceProvider
{
/**
* Indicates if loading of the provider is deferred.
*
* @var bool
*/
protected $defer = true;

/**
* Register the service provider.
*
Expand All @@ -30,9 +36,20 @@ public function register()
$this->app->alias(
TransactionalDispatcher::class, QueueingDispatcherContract::class
);
}

$this->app->afterResolving('db', function ($db, Application $app) {
$app->make(TransactionalDispatcher::class);
});
/**
* Get the services provided by the provider.
*
* @return array
*/
public function provides()
{
return [
TransactionalDispatcher::class,
DispatcherContract::class,
QueueingDispatcherContract::class,
];
}
}

49 changes: 21 additions & 28 deletions src/TransactionalDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
use Closure;
use Illuminate\Bus\Dispatcher;
use Illuminate\Contracts\Container\Container;
use Illuminate\Database\Events\TransactionBeginning;
use Illuminate\Database\DatabaseManager;
use Illuminate\Database\Events\TransactionCommitted;
use Illuminate\Database\Events\TransactionRolledBack;
use Illuminate\Database\ConnectionInterface;
use Illuminate\Contracts\Events\Dispatcher as DispatcherContract;

class TransactionalDispatcher extends Dispatcher
Expand All @@ -23,69 +22,63 @@ class TransactionalDispatcher extends Dispatcher
*/
protected $eventDispatcher;

/**
* @var DatabaseManager
*/
protected $db;

public function __construct(Container $container, Closure $queueResolver = null)
{
parent::__construct($container, $queueResolver);

$this->eventDispatcher = $container->make(DispatcherContract::class);
$this->db = $container->make('db');
$this->setUpTransactionListeners();
}

public function dispatchToQueue($command)
{
// Dispatch immediately if no transactions was opened during job
if (empty($command->afterTransactions) || empty($this->pendingCommands)) {
if (empty($command->afterTransactions) || 0 === $this->db->transactionLevel()) {
return parent::dispatchToQueue($command);
}

// Add command to pending list
foreach ($this->pendingCommands as $connection => $items) {
$this->pendingCommands[$connection][] = $command;
}
$this->pendingCommands[] = $command;

return null;
}

protected function prepareTransaction(ConnectionInterface $connection)
public function commitTransaction()
{
if (!isset($this->pendingCommands[$connection->getName()])) {
$this->pendingCommands[$connection->getName()] = [];
}
}

public function commitTransaction(ConnectionInterface $connection)
{
if (empty($this->pendingCommands[$connection->getName()]) || $connection->transactionLevel() > 0) {
if (empty($this->pendingCommands) || $this->db->transactionLevel() > 0) {
return;
}

$this->dispatchPendingCommands($connection);
$this->dispatchPendingCommands();
}

public function rollbackTransaction(ConnectionInterface $connection)
public function rollbackTransaction()
{
unset($this->pendingCommands[$connection->getName()]);
$this->pendingCommands = [];
}

protected function dispatchPendingCommands(ConnectionInterface $connection)
protected function dispatchPendingCommands()
{
foreach ($this->pendingCommands[$connection->getName()] as $command) {
foreach ($this->pendingCommands as $command) {
parent::dispatchToQueue($command);
}

unset($this->pendingCommands[$connection->getName()]);
$this->pendingCommands = [];
}

protected function setUpTransactionListeners()
{
$this->eventDispatcher->listen(TransactionBeginning::class, function ($event) {
$this->prepareTransaction($event->connection);
});
$this->eventDispatcher->listen(TransactionCommitted::class, function ($event) {
$this->commitTransaction($event->connection);
$this->eventDispatcher->listen(TransactionCommitted::class, function () {
$this->commitTransaction();
});
$this->eventDispatcher->listen(TransactionRolledBack::class, function ($event) {
$this->rollbackTransaction($event->connection);
$this->eventDispatcher->listen(TransactionRolledBack::class, function () {
$this->rollbackTransaction();
});
}
}

0 comments on commit d5a4a71

Please sign in to comment.