Skip to content

Commit

Permalink
WIP: fix api
Browse files Browse the repository at this point in the history
  • Loading branch information
greg0ire committed Jun 7, 2020
1 parent 52da3f5 commit 0f368ee
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 69 deletions.
106 changes: 54 additions & 52 deletions lib/Doctrine/DBAL/Connections/PrimaryReplicaConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,15 @@
*/
class PrimaryReplicaConnection extends Connection
{
/**
* Primary and Replica connection (one of the randomly picked replicas).
*
* @var DriverConnection[]|null[]
*/
private $connections = ['primary' => null, 'replica' => null];
/** @var DriverConnection|null */
private $primary;

/**
* You can keep the replica connection and then switch back to it
* during the request if you know what you are doing.
* One replica picked at random
*
* @var bool
* @var DriverConnection|null
*/
private $keepReplica = false;
private $replica;

/**
* Creates Primary Replica Connection.
Expand Down Expand Up @@ -111,32 +106,42 @@ public function __construct(
$params['replica'][$replicaKey]['driver'] = $params['driver'];
}

$this->keepReplica = (bool) ($params['keepReplica'] ?? false);

parent::__construct($params, $driver, $config, $eventManager);
}

/**
* Checks if the connection is currently towards the primary or not.
*/
public function isConnectedToPrimary(): bool
private function switchToPrimary(): bool
{
return $this->_conn !== null && $this->_conn === $this->connections['primary'];
}
$forcePrimaryAsReplica = $this->getTransactionNestingLevel() > 0;

public function connect(?string $connectionName = null): bool
{
$requestedConnectionChange = ($connectionName !== null);
$connectionName = $connectionName ?: 'replica';
if (isset($this->primary)) {
$this->_conn = $this->primary;

if ($forcePrimaryAsReplica) {
$this->replica = $this->_conn;
}

if ($connectionName !== 'replica' && $connectionName !== 'primary') {
throw new InvalidArgumentException('Invalid option to connect(), only primary or replica allowed.');
return false;
}

// If we have a connection open, and this is not an explicit connection
// change request, then abort right here, because we are already done.
// This prevents writes to the replica in case of "keepReplica" option enabled.
if ($this->_conn !== null && ! $requestedConnectionChange) {
$this->primary = $this->_conn = $this->connectTo('primary');

// Set replica connection to primary to avoid invalid reads
$this->replica = $this->primary;

if ($this->_eventManager->hasListeners(Events::postConnect)) {
$eventArgs = new ConnectionEventArgs($this);
$this->_eventManager->dispatchEvent(Events::postConnect, $eventArgs);
}

return true;
}

public function connect(): bool
{
$connectionName = 'replica';

if ($this->_conn !== null) {
return false;
}

Expand All @@ -147,25 +152,23 @@ public function connect(?string $connectionName = null): bool
$forcePrimaryAsReplica = true;
}

if (isset($this->connections[$connectionName])) {
$this->_conn = $this->connections[$connectionName];
if (isset($this->$connectionName)) {
$this->_conn = $this->$connectionName;

if ($forcePrimaryAsReplica && ! $this->keepReplica) {
$this->connections['replica'] = $this->_conn;
if ($forcePrimaryAsReplica) {
$this->replica = $this->_conn;
}

return false;
}

if ($connectionName === 'primary') {
$this->connections['primary'] = $this->_conn = $this->connectTo($connectionName);
$this->primary = $this->_conn = $this->connectTo($connectionName);

// Set replica connection to primary to avoid invalid reads
if (! $this->keepReplica) {
$this->connections['replica'] = $this->connections['primary'];
}
$this->replica = $this->primary;
} else {
$this->connections['replica'] = $this->_conn = $this->connectTo($connectionName);
$this->replica = $this->_conn = $this->connectTo($connectionName);
}

if ($this->_eventManager->hasListeners(Events::postConnect)) {
Expand Down Expand Up @@ -218,28 +221,28 @@ private function chooseConnectionConfiguration(string $connectionName, array $pa
*/
public function executeUpdate($query, array $params = [], array $types = []): int
{
$this->connect('primary');
$this->switchToPrimary();

return parent::executeUpdate($query, $params, $types);
}

public function beginTransaction(): bool
{
$this->connect('primary');
$this->switchToPrimary();

return parent::beginTransaction();
}

public function commit(): bool
{
$this->connect('primary');
$this->switchToPrimary();

return parent::commit();
}

public function rollBack(): bool
{
$this->connect('primary');
$this->switchToPrimary();

return parent::rollBack();
}
Expand All @@ -249,27 +252,26 @@ public function rollBack(): bool
*/
public function delete($tableName, array $identifier, array $types = []): int
{
$this->connect('primary');
$this->switchToPrimary();

return parent::delete($tableName, $identifier, $types);
}

public function close(): void
{
unset($this->connections['primary'], $this->connections['replica']);
unset($this->primary, $this->replica);

parent::close();

$this->_conn = null;
$this->connections = ['primary' => null, 'replica' => null];
$this->_conn = null;
}

/**
* {@inheritDoc}
*/
public function update($tableName, array $data, array $identifier, array $types = []): int
{
$this->connect('primary');
$this->switchToPrimary();

return parent::update($tableName, $data, $identifier, $types);
}
Expand All @@ -279,7 +281,7 @@ public function update($tableName, array $data, array $identifier, array $types
*/
public function insert($tableName, array $data, array $types = []): int
{
$this->connect('primary');
$this->switchToPrimary();

return parent::insert($tableName, $data, $types);
}
Expand All @@ -289,7 +291,7 @@ public function insert($tableName, array $data, array $types = []): int
*/
public function exec($statement): int
{
$this->connect('primary');
$this->switchToPrimary();

return parent::exec($statement);
}
Expand All @@ -299,7 +301,7 @@ public function exec($statement): int
*/
public function createSavepoint($savepoint): void
{
$this->connect('primary');
$this->switchToPrimary();

parent::createSavepoint($savepoint);
}
Expand All @@ -309,7 +311,7 @@ public function createSavepoint($savepoint): void
*/
public function releaseSavepoint($savepoint): void
{
$this->connect('primary');
$this->switchToPrimary();

parent::releaseSavepoint($savepoint);
}
Expand All @@ -319,14 +321,14 @@ public function releaseSavepoint($savepoint): void
*/
public function rollbackSavepoint($savepoint): void
{
$this->connect('primary');
$this->switchToPrimary();

parent::rollbackSavepoint($savepoint);
}

public function query(): Statement
{
$this->connect('primary');
$this->switchToPrimary();
assert($this->_conn instanceof DriverConnection);

$args = func_get_args();
Expand All @@ -352,7 +354,7 @@ public function query(): Statement
*/
public function prepare($statement): Statement
{
$this->connect('primary');
$this->switchToPrimary();

return parent::prepare($statement);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,11 @@ private function createPrimaryReplicaConnection(bool $keepReplica = false): Prim
/**
* @return mixed[]
*/
private function createPrimaryReplicaConnectionParams(bool $keepReplica = false): array
private function createPrimaryReplicaConnectionParams(): array
{
$params = $this->connection->getParams();
$params['primary'] = $params;
$params['replica'] = [$params, $params];
$params['keepReplica'] = $keepReplica;
$params['wrapperClass'] = PrimaryReplicaConnection::class;

return $params;
Expand All @@ -87,9 +86,7 @@ public function testInheritCharsetFromPrimary(): void

$conn = DriverManager::getConnection($params);
self::assertInstanceOf(PrimaryReplicaConnection::class, $conn);
$conn->connect('replica');

self::assertFalse($conn->isConnectedToPrimary());
$conn->connect();

$clientCharset = $conn->fetchColumn('select @@character_set_client as c');

Expand All @@ -100,18 +97,7 @@ public function testInheritCharsetFromPrimary(): void
}
}

public function testPrimaryOnConnect(): void
{
$conn = $this->createPrimaryReplicaConnection();

self::assertFalse($conn->isConnectedToPrimary());
$conn->connect('replica');
self::assertFalse($conn->isConnectedToPrimary());
$conn->connect('primary');
self::assertTrue($conn->isConnectedToPrimary());
}

public function testNoPrimaryrOnExecuteQuery(): void
public function testNoPrimaryOnExecuteQuery(): void
{
$conn = $this->createPrimaryReplicaConnection();

Expand Down

0 comments on commit 0f368ee

Please sign in to comment.