-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Closes #4052
- Loading branch information
Showing
6 changed files
with
638 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
359 changes: 359 additions & 0 deletions
359
lib/Doctrine/DBAL/Connections/PrimaryReplicaConnection.php
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,359 @@ | ||
<?php | ||
|
||
namespace Doctrine\DBAL\Connections; | ||
|
||
use Doctrine\Common\EventManager; | ||
use Doctrine\DBAL\Configuration; | ||
use Doctrine\DBAL\Connection; | ||
use Doctrine\DBAL\Driver; | ||
use Doctrine\DBAL\Driver\Connection as DriverConnection; | ||
use Doctrine\DBAL\Driver\Statement; | ||
use Doctrine\DBAL\Event\ConnectionEventArgs; | ||
use Doctrine\DBAL\Events; | ||
use InvalidArgumentException; | ||
|
||
use function array_rand; | ||
use function assert; | ||
use function count; | ||
use function func_get_args; | ||
|
||
/** | ||
* Primary-Replica Connection | ||
* | ||
* Connection can be used with primary-replica setups. | ||
* | ||
* Important for the understanding of this connection should be how and when | ||
* it picks the replica or primary. | ||
* | ||
* 1. Replica if primary was never picked before and ONLY if 'getWrappedConnection' | ||
* or 'executeQuery' is used. | ||
* 2. Primary picked when 'exec', 'executeUpdate', 'insert', 'delete', 'update', 'createSavepoint', | ||
* 'releaseSavepoint', 'beginTransaction', 'rollback', 'commit', 'query' or | ||
* 'prepare' is called. | ||
* 3. If Primary was picked once during the lifetime of the connection it will always get picked afterwards. | ||
* 4. One replica connection is randomly picked ONCE during a request. | ||
* | ||
* ATTENTION: You can write to the replica with this connection if you execute a write query without | ||
* opening up a transaction. For example: | ||
* | ||
* $conn = DriverManager::getConnection(...); | ||
* $conn->executeQuery("DELETE FROM table"); | ||
* | ||
* Be aware that Connection#executeQuery is a method specifically for READ | ||
* operations only. | ||
* | ||
* This connection is limited to replica operations using the | ||
* Connection#executeQuery operation only, because it wouldn't be compatible | ||
* with the ORM or SchemaManager code otherwise. Both use all the other | ||
* operations in a context where writes could happen to a replica, which makes | ||
* this restricted approach necessary. | ||
* | ||
* You can manually connect to the primary at any time by calling: | ||
* | ||
* $conn->connect('primary'); | ||
* | ||
* Instantiation through the DriverManager looks like: | ||
* | ||
* @example | ||
* | ||
* $conn = DriverManager::getConnection(array( | ||
* 'wrapperClass' => 'Doctrine\DBAL\Connections\PrimaryReplicaConnection', | ||
* 'driver' => 'pdo_mysql', | ||
* 'primary' => array('user' => '', 'password' => '', 'host' => '', 'dbname' => ''), | ||
* 'replica' => array( | ||
* array('user' => 'replica1', 'password', 'host' => '', 'dbname' => ''), | ||
* array('user' => 'replica2', 'password', 'host' => '', 'dbname' => ''), | ||
* ) | ||
* )); | ||
* | ||
* You can also pass 'driverOptions' and any other documented option to each of this drivers to pass additional information. | ||
*/ | ||
class PrimaryReplicaConnection extends Connection | ||
{ | ||
/** | ||
* Primary and Replica connection (one of the randomly picked replicas). | ||
* | ||
* @var DriverConnection[]|null[] | ||
*/ | ||
private $connections = ['primary' => null, 'replica' => null]; | ||
|
||
/** | ||
* You can keep the replica connection and then switch back to it | ||
* during the request if you know what you are doing. | ||
* | ||
* @var bool | ||
*/ | ||
private $keepReplica = false; | ||
|
||
/** | ||
* Creates Primary Replica Connection. | ||
* | ||
* @param mixed[] $params | ||
* | ||
* @throws InvalidArgumentException | ||
*/ | ||
public function __construct( | ||
array $params, | ||
Driver $driver, | ||
?Configuration $config = null, | ||
?EventManager $eventManager = null | ||
) { | ||
if (! isset($params['replica'], $params['primary'])) { | ||
throw new InvalidArgumentException('primary or replica configuration missing'); | ||
} | ||
|
||
if (count($params['replica']) === 0) { | ||
throw new InvalidArgumentException('You have to configure at least one replica.'); | ||
} | ||
|
||
$params['primary']['driver'] = $params['driver']; | ||
foreach ($params['replica'] as $replicaKey => $replica) { | ||
$params['replica'][$replicaKey]['driver'] = $params['driver']; | ||
} | ||
|
||
$this->keepReplica = (bool) ($params['keepReplica'] ?? false); | ||
|
||
parent::__construct($params, $driver, $config, $eventManager); | ||
} | ||
|
||
/** | ||
* Checks if the connection is currently towards the primary or not. | ||
*/ | ||
public function isConnectedToPrimary(): bool | ||
{ | ||
return $this->_conn !== null && $this->_conn === $this->connections['primary']; | ||
} | ||
|
||
public function connect(?string $connectionName = null): bool | ||
{ | ||
$requestedConnectionChange = ($connectionName !== null); | ||
$connectionName = $connectionName ?: 'replica'; | ||
|
||
if ($connectionName !== 'replica' && $connectionName !== 'primary') { | ||
throw new InvalidArgumentException('Invalid option to connect(), only primary or replica allowed.'); | ||
} | ||
|
||
// If we have a connection open, and this is not an explicit connection | ||
// change request, then abort right here, because we are already done. | ||
// This prevents writes to the replica in case of "keepReplica" option enabled. | ||
if ($this->_conn !== null && ! $requestedConnectionChange) { | ||
return false; | ||
} | ||
|
||
$forcePrimaryAsReplica = false; | ||
|
||
if ($this->getTransactionNestingLevel() > 0) { | ||
$connectionName = 'primary'; | ||
$forcePrimaryAsReplica = true; | ||
} | ||
|
||
if (isset($this->connections[$connectionName])) { | ||
$this->_conn = $this->connections[$connectionName]; | ||
|
||
if ($forcePrimaryAsReplica && ! $this->keepReplica) { | ||
$this->connections['replica'] = $this->_conn; | ||
} | ||
|
||
return false; | ||
} | ||
|
||
if ($connectionName === 'primary') { | ||
$this->connections['primary'] = $this->_conn = $this->connectTo($connectionName); | ||
|
||
// Set replica connection to primary to avoid invalid reads | ||
if (! $this->keepReplica) { | ||
$this->connections['replica'] = $this->connections['primary']; | ||
} | ||
} else { | ||
$this->connections['replica'] = $this->_conn = $this->connectTo($connectionName); | ||
} | ||
|
||
if ($this->_eventManager->hasListeners(Events::postConnect)) { | ||
$eventArgs = new ConnectionEventArgs($this); | ||
$this->_eventManager->dispatchEvent(Events::postConnect, $eventArgs); | ||
} | ||
|
||
return true; | ||
} | ||
|
||
/** | ||
* Connects to a specific connection. | ||
*/ | ||
private function connectTo(string $connectionName): DriverConnection | ||
{ | ||
$params = $this->getParams(); | ||
|
||
$driverOptions = $params['driverOptions'] ?? []; | ||
|
||
$connectionParams = $this->chooseConnectionConfiguration($connectionName, $params); | ||
|
||
$user = $connectionParams['user'] ?? null; | ||
$password = $connectionParams['password'] ?? null; | ||
|
||
return $this->_driver->connect($connectionParams, $user, $password, $driverOptions); | ||
} | ||
|
||
/** | ||
* @param mixed[] $params | ||
* | ||
* @return mixed[] | ||
*/ | ||
private function chooseConnectionConfiguration(string $connectionName, array $params): array | ||
{ | ||
if ($connectionName === 'primary') { | ||
return $params['primary']; | ||
} | ||
|
||
$config = $params['replica'][array_rand($params['replica'])]; | ||
|
||
if (! isset($config['charset']) && isset($params['primary']['charset'])) { | ||
$config['charset'] = $params['primary']['charset']; | ||
} | ||
|
||
return $config; | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function executeUpdate($query, array $params = [], array $types = []): int | ||
{ | ||
$this->connect('primary'); | ||
|
||
return parent::executeUpdate($query, $params, $types); | ||
} | ||
|
||
public function beginTransaction(): bool | ||
{ | ||
$this->connect('primary'); | ||
|
||
return parent::beginTransaction(); | ||
} | ||
|
||
public function commit(): bool | ||
{ | ||
$this->connect('primary'); | ||
|
||
return parent::commit(); | ||
} | ||
|
||
public function rollBack(): bool | ||
{ | ||
$this->connect('primary'); | ||
|
||
return parent::rollBack(); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function delete($tableName, array $identifier, array $types = []): int | ||
{ | ||
$this->connect('primary'); | ||
|
||
return parent::delete($tableName, $identifier, $types); | ||
} | ||
|
||
public function close(): void | ||
{ | ||
unset($this->connections['primary'], $this->connections['replica']); | ||
|
||
parent::close(); | ||
|
||
$this->_conn = null; | ||
$this->connections = ['primary' => null, 'replica' => null]; | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function update($tableName, array $data, array $identifier, array $types = []): int | ||
{ | ||
$this->connect('primary'); | ||
|
||
return parent::update($tableName, $data, $identifier, $types); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function insert($tableName, array $data, array $types = []): int | ||
{ | ||
$this->connect('primary'); | ||
|
||
return parent::insert($tableName, $data, $types); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function exec($statement): int | ||
{ | ||
$this->connect('primary'); | ||
|
||
return parent::exec($statement); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function createSavepoint($savepoint): void | ||
{ | ||
$this->connect('primary'); | ||
|
||
parent::createSavepoint($savepoint); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function releaseSavepoint($savepoint): void | ||
{ | ||
$this->connect('primary'); | ||
|
||
parent::releaseSavepoint($savepoint); | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function rollbackSavepoint($savepoint): void | ||
{ | ||
$this->connect('primary'); | ||
|
||
parent::rollbackSavepoint($savepoint); | ||
} | ||
|
||
public function query(): Statement | ||
{ | ||
$this->connect('primary'); | ||
assert($this->_conn instanceof DriverConnection); | ||
|
||
$args = func_get_args(); | ||
|
||
$logger = $this->getConfiguration()->getSQLLogger(); | ||
if ($logger) { | ||
$logger->startQuery($args[0]); | ||
} | ||
|
||
$statement = $this->_conn->query(...$args); | ||
|
||
$statement->setFetchMode($this->defaultFetchMode); | ||
|
||
if ($logger) { | ||
$logger->stopQuery(); | ||
} | ||
|
||
return $statement; | ||
} | ||
|
||
/** | ||
* {@inheritDoc} | ||
*/ | ||
public function prepare($statement): Statement | ||
{ | ||
$this->connect('primary'); | ||
|
||
return parent::prepare($statement); | ||
} | ||
} |
Oops, something went wrong.