Skip to content

Commit

Permalink
Add ConnectionTransaction
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 4, 2023
1 parent 32df50b commit 9f57ee2
Show file tree
Hide file tree
Showing 4 changed files with 344 additions and 20 deletions.
320 changes: 320 additions & 0 deletions src/ConnectionTransaction.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,320 @@
<?php declare(strict_types=1);

namespace Amp\Sql\Common;

use Amp\DeferredFuture;
use Amp\Sql\Result;
use Amp\Sql\SqlException;
use Amp\Sql\Statement;
use Amp\Sql\Transaction;
use Amp\Sql\TransactionError;
use Amp\Sql\TransactionIsolation;
use Revolt\EventLoop;

/**
* @template TResult of Result
* @template TStatement of Statement<TResult>
* @template TTransaction of Transaction
* @template TNestedExecutor of NestableTransactionExecutor<TResult, TStatement>
*
* @implements Transaction<TResult, TStatement, TTransaction>
*/
abstract class ConnectionTransaction implements Transaction
{
/** @var \Closure():void */
private readonly \Closure $release;

private int $refCount = 1;

private bool $active = true;

private readonly DeferredFuture $onCommit;
private readonly DeferredFuture $onRollback;
private readonly DeferredFuture $onClose;

private ?DeferredFuture $busy = null;

/**
* Creates a Result of the appropriate type using the Result object returned by the Link object and the
* given release callable.
*
* @param TResult $result
* @param \Closure():void $release
*
* @return TResult
*/
abstract protected function createResult(Result $result, \Closure $release): Result;

/**
* Creates a Statement of the appropriate type using the Statement object returned by the Transaction object and
* the given release callable.
*
* @param TStatement $statement
* @param \Closure():void $release
* @param \Closure():void $awaitBusyResource
*
* @return TStatement
*/
abstract protected function createStatement(
Statement $statement,
\Closure $release,
\Closure $awaitBusyResource,
): Statement;

/**
* @param TTransaction $transaction
* @param TNestedExecutor $executor
* @param non-empty-string $identifier
* @param \Closure():void $release
*
* @return TTransaction
*/
abstract protected function createNestedTransaction(
Transaction $transaction,
NestableTransactionExecutor $executor,
string $identifier,
\Closure $release,
): Transaction;

/**
* @param TNestedExecutor $executor
* @param \Closure():void $release
*/
public function __construct(
private readonly NestableTransactionExecutor $executor,
\Closure $release,
private readonly TransactionIsolation $isolation,
) {
$busy = &$this->busy;
$refCount = &$this->refCount;
$this->release = static function () use (&$busy, &$refCount, $release): void {
$busy?->complete();
$busy = null;

if (--$refCount === 0) {
$release();
}
};

$this->onCommit = new DeferredFuture();
$this->onRollback = new DeferredFuture();
$this->onClose = new DeferredFuture();

$this->onClose($this->release);
}

public function __destruct()
{
if (!$this->active) {
return;
}

if ($this->executor->isClosed()) {
$this->onRollback->complete();
$this->onClose->complete();
}

$busy = &$this->busy;
$executor = $this->executor;
$onRollback = $this->onRollback;
$onClose = $this->onClose;
EventLoop::queue(static function () use (&$busy, $executor, $onRollback, $onClose): void {
try {
while ($busy) {
$busy->getFuture()->await();
}

if (!$executor->isClosed()) {
$executor->rollback();
}
} catch (SqlException) {
// Ignore failure if connection closes during query.
} finally {
$onRollback->complete();
$onClose->complete();
}
});
}

public function getLastUsedAt(): int
{
return $this->executor->getLastUsedAt();
}

public function isNestedTransaction(): bool
{
return false;
}

/**
* Closes and rolls back all changes in the transaction.
*/
public function close(): void
{
if (!$this->active) {
return;
}

$this->rollback(); // Invokes $this->release callback.
}

public function isClosed(): bool
{
return $this->onClose->isComplete();
}

public function onClose(\Closure $onClose): void
{
$this->onClose->getFuture()->finally($onClose);
}

/**
* @return bool True if the transaction is active, false if it has been committed or rolled back.
*/
public function isActive(): bool
{
return $this->active && !$this->executor->isClosed();
}

public function getIsolationLevel(): TransactionIsolation
{
return $this->isolation;
}

/**
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function query(string $sql): Result
{
$this->awaitPendingNestedTransaction();

++$this->refCount;
try {
$result = $this->executor->query($sql);
} catch (\Throwable $exception) {
EventLoop::queue($this->release);
throw $exception;
}

return $this->createResult($result, $this->release);
}

/**
* @throws TransactionError If the transaction has been committed or rolled back.
*
* @psalm-suppress InvalidReturnStatement, InvalidReturnType
*/
public function prepare(string $sql): Statement
{
$this->awaitPendingNestedTransaction();

++$this->refCount;
try {
$statement = $this->executor->prepare($sql);
} catch (\Throwable $exception) {
EventLoop::queue($this->release);
throw $exception;
}

$busy = &$this->busy;
return $this->createStatement($statement, $this->release, static function () use (&$busy): void {
while ($busy) {
$busy->getFuture()->await();
}
});
}

/**
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function execute(string $sql, array $params = []): Result
{
$this->awaitPendingNestedTransaction();

++$this->refCount;
try {
$statement = $this->executor->prepare($sql);
$result = $statement->execute($params);
} catch (\Throwable $exception) {
EventLoop::queue($this->release);
throw $exception;
}

return $this->createResult($result, $this->release);
}

public function beginTransaction(): Transaction
{
$this->awaitPendingNestedTransaction();

++$this->refCount;
$this->busy = new DeferredFuture();
try {
$identifier = \bin2hex(\random_bytes(8));
$this->executor->createSavepoint($identifier);
} catch (\Throwable $exception) {
EventLoop::queue($this->release);
throw $exception;
}

/** @psalm-suppress InvalidArgument Recursive templates prevent satisfying this call. */
return $this->createNestedTransaction($this, $this->executor, $identifier, $this->release);
}

/**
* Commits the transaction and makes it inactive.
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function commit(): void
{
$this->active = false;
$this->awaitPendingNestedTransaction();

try {
$this->executor->commit();
} finally {
$this->onCommit->complete();
$this->onClose->complete();
}
}

/**
* Rolls back the transaction and makes it inactive.
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function rollback(): void
{
$this->active = false;
$this->awaitPendingNestedTransaction();

try {
$this->executor->rollback();
} finally {
$this->onRollback->complete();
$this->onClose->complete();
}
}

public function onCommit(\Closure $onCommit): void
{
$this->onCommit->getFuture()->finally($onCommit);
}

public function onRollback(\Closure $onRollback): void
{
$this->onRollback->getFuture()->finally($onRollback);
}

private function awaitPendingNestedTransaction(): void
{
while ($this->busy) {
$this->busy->getFuture()->await();
}

if ($this->isClosed()) {
throw new TransactionError("The transaction has been committed or rolled back");
}
}
}
18 changes: 10 additions & 8 deletions src/NestableTransactionExecutor.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
use Amp\Sql\Executor;
use Amp\Sql\Result;
use Amp\Sql\Statement;
use Amp\Sql\Transaction;
use Amp\Sql\TransactionError;

/**
* @template TResult of Result
Expand All @@ -16,30 +14,34 @@
*/
interface NestableTransactionExecutor extends Executor
{
/**
* Commits the current transaction.
*/
public function commit(): void;

/**
* Rolls back the current transaction.
*/
public function rollback(): void;

/**
* Creates a savepoint with the given identifier.
*
* @param non-empty-string $identifier Savepoint identifier.
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function createSavepoint(string $identifier): void;

/**
* Rolls back to the savepoint with the given identifier.
*
* @param non-empty-string $identifier Savepoint identifier.
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function rollbackTo(string $identifier): void;

/**
* Releases the savepoint with the given identifier.
*
* @param non-empty-string $identifier Savepoint identifier.
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function releaseSavepoint(string $identifier): void;
}
10 changes: 5 additions & 5 deletions src/NestedTransaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ abstract class NestedTransaction implements Transaction

private bool $active = true;

private ?DeferredFuture $busy = null;
private readonly DeferredFuture $onCommit;
private readonly DeferredFuture $onRollback;
private readonly DeferredFuture $onClose;

private DeferredFuture $onCommit;
private DeferredFuture $onRollback;
private DeferredFuture $onClose;
private ?DeferredFuture $busy = null;

private int $nextId = 1;

Expand Down Expand Up @@ -121,7 +121,7 @@ public function __destruct()
$executor,
$identifier,
$onRollback,
$onClose
$onClose,
): void {
try {
while ($busy) {
Expand Down
Loading

0 comments on commit 9f57ee2

Please sign in to comment.