diff --git a/lib/Doctrine/DBAL/Connections/MasterSlaveConnection.php b/lib/Doctrine/DBAL/Connections/MasterSlaveConnection.php index 9362bc0c636..1bce36957c6 100644 --- a/lib/Doctrine/DBAL/Connections/MasterSlaveConnection.php +++ b/lib/Doctrine/DBAL/Connections/MasterSlaveConnection.php @@ -53,6 +53,8 @@ * * Instantiation through the DriverManager looks like: * + * @deprecated use PrimaryReplicaConnection instead + * * @example * * $conn = DriverManager::getConnection(array( diff --git a/lib/Doctrine/DBAL/Connections/PrimaryReplicaConnection.php b/lib/Doctrine/DBAL/Connections/PrimaryReplicaConnection.php new file mode 100644 index 00000000000..5aafa3672c2 --- /dev/null +++ b/lib/Doctrine/DBAL/Connections/PrimaryReplicaConnection.php @@ -0,0 +1,359 @@ +executeQuery("DELETE FROM table"); + * + * Be aware that Connection#executeQuery is a method specifically for READ + * operations only. + * + * This connection is limited to replica operations using the + * Connection#executeQuery operation only, because it wouldn't be compatible + * with the ORM or SchemaManager code otherwise. Both use all the other + * operations in a context where writes could happen to a replica, which makes + * this restricted approach necessary. + * + * You can manually connect to the primary at any time by calling: + * + * $conn->connect('primary'); + * + * Instantiation through the DriverManager looks like: + * + * @example + * + * $conn = DriverManager::getConnection(array( + * 'wrapperClass' => 'Doctrine\DBAL\Connections\PrimaryReplicaConnection', + * 'driver' => 'pdo_mysql', + * 'primary' => array('user' => '', 'password' => '', 'host' => '', 'dbname' => ''), + * 'replica' => array( + * array('user' => 'replica1', 'password', 'host' => '', 'dbname' => ''), + * array('user' => 'replica2', 'password', 'host' => '', 'dbname' => ''), + * ) + * )); + * + * You can also pass 'driverOptions' and any other documented option to each of this drivers to pass additional information. + */ +final class PrimaryReplicaConnection extends Connection +{ + /** + * Primary and Replica connection (one of the randomly picked replicas). + * + * @var DriverConnection[]|null[] + */ + private $connections = ['primary' => null, 'replica' => null]; + + /** + * You can keep the replica connection and then switch back to it + * during the request if you know what you are doing. + * + * @var bool + */ + private $keepReplica = false; + + /** + * Creates Primary Replica Connection. + * + * @param mixed[] $params + * + * @throws InvalidArgumentException + */ + public function __construct( + array $params, + Driver $driver, + ?Configuration $config = null, + ?EventManager $eventManager = null + ) { + if (! isset($params['replica'], $params['primary'])) { + throw new InvalidArgumentException('primary or replica configuration missing'); + } + + if (count($params['replica']) === 0) { + throw new InvalidArgumentException('You have to configure at least one replica.'); + } + + $params['primary']['driver'] = $params['driver']; + foreach ($params['replica'] as $replicaKey => $replica) { + $params['replica'][$replicaKey]['driver'] = $params['driver']; + } + + $this->keepReplica = (bool) ($params['keepReplica'] ?? false); + + parent::__construct($params, $driver, $config, $eventManager); + } + + /** + * Checks if the connection is currently towards the primary or not. + */ + public function isConnectedToPrimary(): bool + { + return $this->_conn !== null && $this->_conn === $this->connections['primary']; + } + + public function connect(?string $connectionName = null): bool + { + $requestedConnectionChange = ($connectionName !== null); + $connectionName = $connectionName ?: 'replica'; + + if ($connectionName !== 'replica' && $connectionName !== 'primary') { + throw new InvalidArgumentException('Invalid option to connect(), only primary or replica allowed.'); + } + + // If we have a connection open, and this is not an explicit connection + // change request, then abort right here, because we are already done. + // This prevents writes to the replica in case of "keepReplica" option enabled. + if ($this->_conn !== null && ! $requestedConnectionChange) { + return false; + } + + $forcePrimaryAsReplica = false; + + if ($this->getTransactionNestingLevel() > 0) { + $connectionName = 'primary'; + $forcePrimaryAsReplica = true; + } + + if (isset($this->connections[$connectionName])) { + $this->_conn = $this->connections[$connectionName]; + + if ($forcePrimaryAsReplica && ! $this->keepReplica) { + $this->connections['replica'] = $this->_conn; + } + + return false; + } + + if ($connectionName === 'primary') { + $this->connections['primary'] = $this->_conn = $this->connectTo($connectionName); + + // Set replica connection to primary to avoid invalid reads + if (! $this->keepReplica) { + $this->connections['replica'] = $this->connections['primary']; + } + } else { + $this->connections['replica'] = $this->_conn = $this->connectTo($connectionName); + } + + if ($this->_eventManager->hasListeners(Events::postConnect)) { + $eventArgs = new ConnectionEventArgs($this); + $this->_eventManager->dispatchEvent(Events::postConnect, $eventArgs); + } + + return true; + } + + /** + * Connects to a specific connection. + */ + private function connectTo(string $connectionName): DriverConnection + { + $params = $this->getParams(); + + $driverOptions = $params['driverOptions'] ?? []; + + $connectionParams = $this->chooseConnectionConfiguration($connectionName, $params); + + $user = $connectionParams['user'] ?? null; + $password = $connectionParams['password'] ?? null; + + return $this->_driver->connect($connectionParams, $user, $password, $driverOptions); + } + + /** + * @param mixed[] $params + * + * @return mixed[] + */ + private function chooseConnectionConfiguration(string $connectionName, array $params): array + { + if ($connectionName === 'primary') { + return $params['primary']; + } + + $config = $params['replica'][array_rand($params['replica'])]; + + if (! isset($config['charset']) && isset($params['primary']['charset'])) { + $config['charset'] = $params['primary']['charset']; + } + + return $config; + } + + /** + * {@inheritDoc} + */ + public function executeUpdate($query, array $params = [], array $types = []): int + { + $this->connect('primary'); + + return parent::executeUpdate($query, $params, $types); + } + + public function beginTransaction(): bool + { + $this->connect('primary'); + + return parent::beginTransaction(); + } + + public function commit(): bool + { + $this->connect('primary'); + + return parent::commit(); + } + + public function rollBack(): bool + { + $this->connect('primary'); + + return parent::rollBack(); + } + + /** + * {@inheritDoc} + */ + public function delete($tableName, array $identifier, array $types = []): int + { + $this->connect('primary'); + + return parent::delete($tableName, $identifier, $types); + } + + public function close(): void + { + unset($this->connections['primary'], $this->connections['replica']); + + parent::close(); + + $this->_conn = null; + $this->connections = ['primary' => null, 'replica' => null]; + } + + /** + * {@inheritDoc} + */ + public function update($tableName, array $data, array $identifier, array $types = []): int + { + $this->connect('primary'); + + return parent::update($tableName, $data, $identifier, $types); + } + + /** + * {@inheritDoc} + */ + public function insert($tableName, array $data, array $types = []): int + { + $this->connect('primary'); + + return parent::insert($tableName, $data, $types); + } + + /** + * {@inheritDoc} + */ + public function exec($statement): int + { + $this->connect('primary'); + + return parent::exec($statement); + } + + /** + * {@inheritDoc} + */ + public function createSavepoint($savepoint): void + { + $this->connect('primary'); + + parent::createSavepoint($savepoint); + } + + /** + * {@inheritDoc} + */ + public function releaseSavepoint($savepoint): void + { + $this->connect('primary'); + + parent::releaseSavepoint($savepoint); + } + + /** + * {@inheritDoc} + */ + public function rollbackSavepoint($savepoint): void + { + $this->connect('primary'); + + parent::rollbackSavepoint($savepoint); + } + + public function query(): Statement + { + $this->connect('primary'); + assert($this->_conn instanceof DriverConnection); + + $args = func_get_args(); + + $logger = $this->getConfiguration()->getSQLLogger(); + if ($logger) { + $logger->startQuery($args[0]); + } + + $statement = $this->_conn->query(...$args); + + $statement->setFetchMode($this->defaultFetchMode); + + if ($logger) { + $logger->stopQuery(); + } + + return $statement; + } + + /** + * {@inheritDoc} + */ + public function prepare($statement): Statement + { + $this->connect('primary'); + + return parent::prepare($statement); + } +} diff --git a/lib/Doctrine/DBAL/DriverManager.php b/lib/Doctrine/DBAL/DriverManager.php index f7462bbd462..b56ac7f9054 100644 --- a/lib/Doctrine/DBAL/DriverManager.php +++ b/lib/Doctrine/DBAL/DriverManager.php @@ -142,17 +142,29 @@ public static function getConnection( $params = self::parseDatabaseUrl($params); - // URL support for MasterSlaveConnection + // @todo: deprecated, notice thrown by connection constructor if (isset($params['master'])) { $params['master'] = self::parseDatabaseUrl($params['master']); } + // @todo: deprecated, notice thrown by connection constructor if (isset($params['slaves'])) { foreach ($params['slaves'] as $key => $slaveParams) { $params['slaves'][$key] = self::parseDatabaseUrl($slaveParams); } } + // URL support for PrimaryReplicaConnection + if (isset($params['primary'])) { + $params['primary'] = self::parseDatabaseUrl($params['primary']); + } + + if (isset($params['replica'])) { + foreach ($params['replica'] as $key => $replicaParams) { + $params['replica'][$key] = self::parseDatabaseUrl($replicaParams); + } + } + // URL support for PoolingShardConnection if (isset($params['global'])) { $params['global'] = self::parseDatabaseUrl($params['global']); diff --git a/tests/Doctrine/Tests/DBAL/DriverManagerTest.php b/tests/Doctrine/Tests/DBAL/DriverManagerTest.php index 524e03031c4..c5e9bf38bd7 100644 --- a/tests/Doctrine/Tests/DBAL/DriverManagerTest.php +++ b/tests/Doctrine/Tests/DBAL/DriverManagerTest.php @@ -3,7 +3,7 @@ namespace Doctrine\Tests\DBAL; use Doctrine\DBAL\Connection; -use Doctrine\DBAL\Connections\MasterSlaveConnection; +use Doctrine\DBAL\Connections\PrimaryReplicaConnection; use Doctrine\DBAL\DBALException; use Doctrine\DBAL\Driver; use Doctrine\DBAL\Driver\DrizzlePDOMySql\Driver as DrizzlePDOMySqlDriver; @@ -139,15 +139,15 @@ public function testValidDriverClass(): void self::assertInstanceOf(PDOMySQLDriver::class, $conn->getDriver()); } - public function testDatabaseUrlMasterSlave(): void + public function testDatabaseUrlPrimaryReplica(): void { $options = [ 'driver' => 'pdo_mysql', - 'master' => ['url' => 'mysql://foo:bar@localhost:11211/baz'], - 'slaves' => [ - 'slave1' => ['url' => 'mysql://foo:bar@localhost:11211/baz_slave'], + 'primary' => ['url' => 'mysql://foo:bar@localhost:11211/baz'], + 'replica' => [ + 'replica1' => ['url' => 'mysql://foo:bar@localhost:11211/baz_replica'], ], - 'wrapperClass' => MasterSlaveConnection::class, + 'wrapperClass' => PrimaryReplicaConnection::class, ]; $conn = DriverManager::getConnection($options); @@ -163,12 +163,12 @@ public function testDatabaseUrlMasterSlave(): void ]; foreach ($expected as $key => $value) { - self::assertEquals($value, $params['master'][$key]); - self::assertEquals($value, $params['slaves']['slave1'][$key]); + self::assertEquals($value, $params['primary'][$key]); + self::assertEquals($value, $params['replica']['replica1'][$key]); } - self::assertEquals('baz', $params['master']['dbname']); - self::assertEquals('baz_slave', $params['slaves']['slave1']['dbname']); + self::assertEquals('baz', $params['primary']['dbname']); + self::assertEquals('baz_replica', $params['replica']['replica1']['dbname']); } public function testDatabaseUrlShard(): void diff --git a/tests/Doctrine/Tests/DBAL/Functional/PrimaryReplicaConnectionTest.php b/tests/Doctrine/Tests/DBAL/Functional/PrimaryReplicaConnectionTest.php new file mode 100644 index 00000000000..4ddabeb8336 --- /dev/null +++ b/tests/Doctrine/Tests/DBAL/Functional/PrimaryReplicaConnectionTest.php @@ -0,0 +1,243 @@ +connection->getDatabasePlatform()->getName(); + + // This is a MySQL specific test, skip other vendors. + if ($platformName !== 'mysql') { + $this->markTestSkipped(sprintf('Test does not work on %s.', $platformName)); + } + + try { + $table = new Table('primary_replica_table'); + $table->addColumn('test_int', 'integer'); + $table->setPrimaryKey(['test_int']); + + $sm = $this->connection->getSchemaManager(); + $sm->createTable($table); + } catch (Throwable $e) { + } + + $this->connection->executeUpdate('DELETE FROM primary_replica_table'); + $this->connection->insert('primary_replica_table', ['test_int' => 1]); + } + + private function createPrimaryReplicaConnection(bool $keepReplica = false): PrimaryReplicaConnection + { + return DriverManager::getConnection($this->createPrimaryReplicaConnectionParams($keepReplica)); + } + + /** + * @return mixed[] + */ + private function createPrimaryReplicaConnectionParams(bool $keepReplica = false): array + { + $params = $this->connection->getParams(); + $params['primary'] = $params; + $params['replica'] = [$params, $params]; + $params['keepReplica'] = $keepReplica; + $params['wrapperClass'] = PrimaryReplicaConnection::class; + + return $params; + } + + public function testInheritCharsetFromPrimary(): void + { + $charsets = [ + 'utf8', + 'latin1', + ]; + + foreach ($charsets as $charset) { + $params = $this->createPrimaryReplicaConnectionParams(); + $params['primary']['charset'] = $charset; + + foreach ($params['replica'] as $index => $replicaParams) { + if (! isset($replicaParams['charset'])) { + continue; + } + + unset($params['replica'][$index]['charset']); + } + + $conn = DriverManager::getConnection($params); + self::assertInstanceOf(PrimaryReplicaConnection::class, $conn); + $conn->connect('replica'); + + self::assertFalse($conn->isConnectedToPrimary()); + + $clientCharset = $conn->fetchColumn('select @@character_set_client as c'); + + self::assertSame( + $charset, + substr(strtolower($clientCharset), 0, strlen($charset)) + ); + } + } + + public function testPrimaryOnConnect(): void + { + $conn = $this->createPrimaryReplicaConnection(); + + self::assertFalse($conn->isConnectedToPrimary()); + $conn->connect('replica'); + self::assertFalse($conn->isConnectedToPrimary()); + $conn->connect('primary'); + self::assertTrue($conn->isConnectedToPrimary()); + } + + public function testNoPrimaryrOnExecuteQuery(): void + { + $conn = $this->createPrimaryReplicaConnection(); + + $sql = 'SELECT count(*) as num FROM primary_replica_table'; + $data = $conn->fetchAll($sql); + $data[0] = array_change_key_case($data[0], CASE_LOWER); + + self::assertEquals(1, $data[0]['num']); + self::assertFalse($conn->isConnectedToPrimary()); + } + + public function testPrimaryOnWriteOperation(): void + { + $conn = $this->createPrimaryReplicaConnection(); + $conn->insert('primary_replica_table', ['test_int' => 30]); + + self::assertTrue($conn->isConnectedToPrimary()); + + $sql = 'SELECT count(*) as num FROM primary_replica_table'; + $data = $conn->fetchAll($sql); + $data[0] = array_change_key_case($data[0], CASE_LOWER); + + self::assertEquals(2, $data[0]['num']); + self::assertTrue($conn->isConnectedToPrimary()); + } + + /** + * @group DBAL-335 + */ + public function testKeepReplicaBeginTransactionStaysOnPrimary(): void + { + $conn = $this->createPrimaryReplicaConnection($keepReplica = true); + $conn->connect('replica'); + + $conn->beginTransaction(); + $conn->insert('primary_replica_table', ['test_int' => 30]); + $conn->commit(); + + self::assertTrue($conn->isConnectedToPrimary()); + + $conn->connect(); + self::assertTrue($conn->isConnectedToPrimary()); + + $conn->connect('replica'); + self::assertFalse($conn->isConnectedToPrimary()); + } + + /** + * @group DBAL-335 + */ + public function testKeepReplicaInsertStaysOnPrimary(): void + { + $conn = $this->createPrimaryReplicaConnection($keepReplica = true); + $conn->connect('replica'); + + $conn->insert('primary_replica_table', ['test_int' => 30]); + + self::assertTrue($conn->isConnectedToPrimary()); + + $conn->connect(); + self::assertTrue($conn->isConnectedToPrimary()); + + $conn->connect('replica'); + self::assertFalse($conn->isConnectedToPrimary()); + } + + public function testPrimaryReplicaConnectionCloseAndReconnect(): void + { + $conn = $this->createPrimaryReplicaConnection(); + $conn->connect('primary'); + self::assertTrue($conn->isConnectedToPrimary()); + + $conn->close(); + self::assertFalse($conn->isConnectedToPrimary()); + + $conn->connect('primary'); + self::assertTrue($conn->isConnectedToPrimary()); + } + + public function testQueryOnPrimary(): void + { + $conn = $this->createPrimaryReplicaConnection(); + + $query = 'SELECT count(*) as num FROM primary_replica_table'; + + $statement = $conn->query($query); + + self::assertInstanceOf(Statement::class, $statement); + + //Query must be executed only on Primary + self::assertTrue($conn->isConnectedToPrimary()); + + $data = $statement->fetchAll(); + + //Default fetchmode is FetchMode::ASSOCIATIVE + self::assertArrayHasKey(0, $data); + self::assertArrayHasKey('num', $data[0]); + + //Could be set in other fetchmodes + self::assertArrayNotHasKey(0, $data[0]); + self::assertEquals(1, $data[0]['num']); + } + + public function testQueryOnReplica(): void + { + $conn = $this->createPrimaryReplicaConnection(); + $conn->connect('replica'); + + $query = 'SELECT count(*) as num FROM primary_replica_table'; + + $statement = $conn->query($query); + + self::assertInstanceOf(Statement::class, $statement); + + //Query must be executed only on Primary, even when we connect to the replica + self::assertTrue($conn->isConnectedToPrimary()); + + $data = $statement->fetchAll(); + + //Default fetchmode is FetchMode::ASSOCIATIVE + self::assertArrayHasKey(0, $data); + self::assertArrayHasKey('num', $data[0]); + + //Could be set in other fetchmodes + self::assertArrayNotHasKey(0, $data[0]); + + self::assertEquals(1, $data[0]['num']); + } +}