Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to move command queue to MysqlClient and connection logic to Connection class #190

Merged
merged 3 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 96 additions & 11 deletions src/Io/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace React\Mysql\Io;

use Evenement\EventEmitter;
use React\EventLoop\LoopInterface;
use React\Mysql\Commands\CommandInterface;
use React\Mysql\Commands\PingCommand;
use React\Mysql\Commands\QueryCommand;
Expand All @@ -29,30 +30,66 @@ class Connection extends EventEmitter
private $executor;

/**
* @var integer
* @var int one of the state constants (may change, but should be used readonly from outside)
* @see self::STATE_*
*/
private $state = self::STATE_AUTHENTICATED;
public $state = self::STATE_AUTHENTICATED;

/**
* @var SocketConnectionInterface
*/
private $stream;

/** @var Parser */
private $parser;

/** @var LoopInterface */
private $loop;

/** @var float */
private $idlePeriod = 0.001;

/** @var ?\React\EventLoop\TimerInterface */
private $idleTimer;

/** @var int */
private $pending = 0;

/**
* Connection constructor.
*
* @param SocketConnectionInterface $stream
* @param Executor $executor
* @param Parser $parser
* @param LoopInterface $loop
* @param ?float $idlePeriod
*/
public function __construct(SocketConnectionInterface $stream, Executor $executor)
public function __construct(SocketConnectionInterface $stream, Executor $executor, Parser $parser, LoopInterface $loop, $idlePeriod)
{
$this->stream = $stream;
$this->executor = $executor;
$this->parser = $parser;

$this->loop = $loop;
if ($idlePeriod !== null) {
$this->idlePeriod = $idlePeriod;
}

$stream->on('error', [$this, 'handleConnectionError']);
$stream->on('close', [$this, 'handleConnectionClosed']);
}

/**
* busy executing some command such as query or ping
*
* @return bool
* @throws void
*/
public function isBusy()
{
return $this->parser->isBusy() || !$this->executor->isIdle();
}

/**
* {@inheritdoc}
*/
Expand All @@ -71,6 +108,7 @@ public function query($sql, array $params = [])
return \React\Promise\reject($e);
}

$this->awake();
$deferred = new Deferred();

// store all result set rows until result set end
Expand All @@ -86,11 +124,13 @@ public function query($sql, array $params = [])

$rows = [];

$this->idle();
$deferred->resolve($result);
});

// resolve / reject status reply (response without result set)
$command->on('error', function ($error) use ($deferred) {
$this->idle();
$deferred->reject($error);
});
$command->on('success', function () use ($command, $deferred) {
Expand All @@ -99,6 +139,7 @@ public function query($sql, array $params = [])
$result->insertId = $command->insertId;
$result->warningCount = $command->warningCount;

$this->idle();
$deferred->resolve($result);
});

Expand All @@ -115,20 +156,30 @@ public function queryStream($sql, $params = [])
$command = new QueryCommand();
$command->setQuery($query);
$this->_doCommand($command);
$this->awake();

$stream = new QueryStream($command, $this->stream);
$stream->on('close', function () {
$this->idle();
});

return new QueryStream($command, $this->stream);
return $stream;
}

public function ping()
{
return new Promise(function ($resolve, $reject) {
$this->_doCommand(new PingCommand())
->on('error', function ($reason) use ($reject) {
$reject($reason);
})
->on('success', function () use ($resolve) {
$resolve(null);
});
$command = $this->_doCommand(new PingCommand());
$this->awake();

$command->on('success', function () use ($resolve) {
$this->idle();
$resolve(null);
});
$command->on('error', function ($reason) use ($reject) {
$this->idle();
$reject($reason);
});
});
}

Expand All @@ -137,6 +188,10 @@ public function quit()
return new Promise(function ($resolve, $reject) {
$command = $this->_doCommand(new QuitCommand());
$this->state = self::STATE_CLOSING;

// mark connection as "awake" until it is closed, so never "idle"
$this->awake();

$command->on('success', function () use ($resolve) {
$resolve(null);
$this->close();
Expand All @@ -158,6 +213,11 @@ public function close()
$remoteClosed = $this->stream->isReadable() === false && $this->stream->isWritable() === false;
$this->stream->close();

if ($this->idleTimer !== null) {
$this->loop->cancelTimer($this->idleTimer);
$this->idleTimer = null;
}

// reject all pending commands if connection is closed
while (!$this->executor->isIdle()) {
$command = $this->executor->dequeue();
Expand Down Expand Up @@ -223,4 +283,29 @@ protected function _doCommand(CommandInterface $command)

return $this->executor->enqueue($command);
}

private function awake()
{
++$this->pending;

if ($this->idleTimer !== null) {
$this->loop->cancelTimer($this->idleTimer);
$this->idleTimer = null;
}
}

private function idle()
{
--$this->pending;

if ($this->pending < 1 && $this->idlePeriod >= 0 && $this->state === self::STATE_AUTHENTICATED) {
$this->idleTimer = $this->loop->addTimer($this->idlePeriod, function () {
// soft-close connection and emit close event afterwards both on success or on error
$this->idleTimer = null;
$this->quit()->then(null, function () {
// ignore to avoid reporting unhandled rejection
});
});
}
}
}
5 changes: 3 additions & 2 deletions src/Io/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,12 @@ public function createConnection(
$connecting->cancel();
});

$connecting->then(function (SocketConnectionInterface $stream) use ($authCommand, $deferred, $uri) {
$idlePeriod = isset($args['idle']) ? (float) $args['idle'] : null;
$connecting->then(function (SocketConnectionInterface $stream) use ($authCommand, $deferred, $uri, $idlePeriod) {
$executor = new Executor();
$parser = new Parser($stream, $executor);

$connection = new Connection($stream, $executor);
$connection = new Connection($stream, $executor, $parser, $this->loop, $idlePeriod);
$command = $executor->enqueue($authCommand);
$parser->start();

Expand Down
11 changes: 11 additions & 0 deletions src/Io/Parser.php
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ public function __construct(DuplexStreamInterface $stream, Executor $executor)
});
}

/**
* busy executing some command such as query or ping
*
* @return bool
* @throws void
*/
public function isBusy()
{
return $this->currCommand !== null;
}

public function start()
{
$this->stream->on('data', [$this, 'handleData']);
Expand Down
Loading