Skip to content

Commit

Permalink
Merge pull request #486 from clue-labs/keep-alive
Browse files Browse the repository at this point in the history
Support HTTP keep-alive for HTTP client (reusing persistent connections)
  • Loading branch information
SimonFrings authored Jan 24, 2023
2 parents b34bbed + ebaf6f1 commit a221964
Show file tree
Hide file tree
Showing 12 changed files with 1,075 additions and 240 deletions.
1 change: 1 addition & 0 deletions src/Browser.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class Browser
private $baseUrl;
private $protocolVersion = '1.1';
private $defaultHeaders = array(
'Connection' => 'close',
'User-Agent' => 'ReactPHP/1'
);

Expand Down
17 changes: 6 additions & 11 deletions src/Client/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,25 @@
namespace React\Http\Client;

use Psr\Http\Message\RequestInterface;
use React\EventLoop\LoopInterface;
use React\Http\Io\ClientConnectionManager;
use React\Http\Io\ClientRequestStream;
use React\Socket\Connector;
use React\Socket\ConnectorInterface;

/**
* @internal
*/
class Client
{
private $connector;
/** @var ClientConnectionManager */
private $connectionManager;

public function __construct(LoopInterface $loop, ConnectorInterface $connector = null)
public function __construct(ClientConnectionManager $connectionManager)
{
if ($connector === null) {
$connector = new Connector(array(), $loop);
}

$this->connector = $connector;
$this->connectionManager = $connectionManager;
}

/** @return ClientRequestStream */
public function request(RequestInterface $request)
{
return new ClientRequestStream($this->connector, $request);
return new ClientRequestStream($this->connectionManager, $request);
}
}
137 changes: 137 additions & 0 deletions src/Io/ClientConnectionManager.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
<?php

namespace React\Http\Io;

use Psr\Http\Message\UriInterface;
use React\EventLoop\LoopInterface;
use React\EventLoop\TimerInterface;
use React\Promise\PromiseInterface;
use React\Socket\ConnectionInterface;
use React\Socket\ConnectorInterface;

/**
* [Internal] Manages outgoing HTTP connections for the HTTP client
*
* @internal
* @final
*/
class ClientConnectionManager
{
/** @var ConnectorInterface */
private $connector;

/** @var LoopInterface */
private $loop;

/** @var string[] */
private $idleUris = array();

/** @var ConnectionInterface[] */
private $idleConnections = array();

/** @var TimerInterface[] */
private $idleTimers = array();

/** @var \Closure[] */
private $idleStreamHandlers = array();

/** @var float */
private $maximumTimeToKeepAliveIdleConnection = 0.001;

public function __construct(ConnectorInterface $connector, LoopInterface $loop)
{
$this->connector = $connector;
$this->loop = $loop;
}

/**
* @return PromiseInterface<ConnectionInterface>
*/
public function connect(UriInterface $uri)
{
$scheme = $uri->getScheme();
if ($scheme !== 'https' && $scheme !== 'http') {
return \React\Promise\reject(new \InvalidArgumentException(
'Invalid request URL given'
));
}

$port = $uri->getPort();
if ($port === null) {
$port = $scheme === 'https' ? 443 : 80;
}
$uri = ($scheme === 'https' ? 'tls://' : '') . $uri->getHost() . ':' . $port;

// Reuse idle connection for same URI if available
foreach ($this->idleConnections as $id => $connection) {
if ($this->idleUris[$id] === $uri) {
assert($this->idleStreamHandlers[$id] instanceof \Closure);
$connection->removeListener('close', $this->idleStreamHandlers[$id]);
$connection->removeListener('data', $this->idleStreamHandlers[$id]);
$connection->removeListener('error', $this->idleStreamHandlers[$id]);

assert($this->idleTimers[$id] instanceof TimerInterface);
$this->loop->cancelTimer($this->idleTimers[$id]);
unset($this->idleUris[$id], $this->idleConnections[$id], $this->idleTimers[$id], $this->idleStreamHandlers[$id]);

return \React\Promise\resolve($connection);
}
}

// Create new connection if no idle connection to same URI is available
return $this->connector->connect($uri);
}

/**
* Hands back an idle connection to the connection manager for possible future reuse.
*
* @return void
*/
public function keepAlive(UriInterface $uri, ConnectionInterface $connection)
{
$scheme = $uri->getScheme();
assert($scheme === 'https' || $scheme === 'http');

$port = $uri->getPort();
if ($port === null) {
$port = $scheme === 'https' ? 443 : 80;
}

$this->idleUris[] = ($scheme === 'https' ? 'tls://' : '') . $uri->getHost() . ':' . $port;
$this->idleConnections[] = $connection;

$that = $this;
$cleanUp = function () use ($connection, $that) {
// call public method to support legacy PHP 5.3
$that->cleanUpConnection($connection);
};

// clean up and close connection when maximum time to keep-alive idle connection has passed
$this->idleTimers[] = $this->loop->addTimer($this->maximumTimeToKeepAliveIdleConnection, $cleanUp);

// clean up and close connection when unexpected close/data/error event happens during idle time
$this->idleStreamHandlers[] = $cleanUp;
$connection->on('close', $cleanUp);
$connection->on('data', $cleanUp);
$connection->on('error', $cleanUp);
}

/**
* @internal
* @return void
*/
public function cleanUpConnection(ConnectionInterface $connection) // private (PHP 5.4+)
{
$id = \array_search($connection, $this->idleConnections, true);
if ($id === false) {
return;
}

assert(\is_int($id));
assert($this->idleTimers[$id] instanceof TimerInterface);
$this->loop->cancelTimer($this->idleTimers[$id]);
unset($this->idleUris[$id], $this->idleConnections[$id], $this->idleTimers[$id], $this->idleStreamHandlers[$id]);

$connection->close();
}
}
63 changes: 37 additions & 26 deletions src/Io/ClientRequestStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
namespace React\Http\Io;

use Evenement\EventEmitter;
use Psr\Http\Message\MessageInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use React\Http\Message\Response;
use React\Promise;
use React\Socket\ConnectionInterface;
use React\Socket\ConnectorInterface;
use React\Stream\WritableStreamInterface;
use RingCentral\Psr7 as gPsr;

Expand All @@ -26,8 +24,8 @@ class ClientRequestStream extends EventEmitter implements WritableStreamInterfac
const STATE_HEAD_WRITTEN = 2;
const STATE_END = 3;

/** @var ConnectorInterface */
private $connector;
/** @var ClientConnectionManager */
private $connectionManager;

/** @var RequestInterface */
private $request;
Expand All @@ -44,9 +42,9 @@ class ClientRequestStream extends EventEmitter implements WritableStreamInterfac

private $pendingWrites = '';

public function __construct(ConnectorInterface $connector, RequestInterface $request)
public function __construct(ClientConnectionManager $connectionManager, RequestInterface $request)
{
$this->connector = $connector;
$this->connectionManager = $connectionManager;
$this->request = $request;
}

Expand All @@ -65,7 +63,7 @@ private function writeHead()
$pendingWrites = &$this->pendingWrites;
$that = $this;

$promise = $this->connect();
$promise = $this->connectionManager->connect($this->request->getUri());
$promise->then(
function (ConnectionInterface $connection) use ($request, &$connectionRef, &$stateRef, &$pendingWrites, $that) {
$connectionRef = $connection;
Expand Down Expand Up @@ -174,11 +172,20 @@ public function handleData($data)
$this->connection = null;
$this->buffer = '';

// take control over connection handling and close connection once response body closes
// take control over connection handling and check if we can reuse the connection once response body closes
$that = $this;
$request = $this->request;
$connectionManager = $this->connectionManager;
$successfulEndReceived = false;
$input = $body = new CloseProtectionStream($connection);
$input->on('close', function () use ($connection, $that) {
$connection->close();
$input->on('close', function () use ($connection, $that, $connectionManager, $request, $response, &$successfulEndReceived) {
// only reuse connection after successful response and both request and response allow keep alive
if ($successfulEndReceived && $connection->isReadable() && $that->hasMessageKeepAliveEnabled($response) && $that->hasMessageKeepAliveEnabled($request)) {
$connectionManager->keepAlive($request->getUri(), $connection);
} else {
$connection->close();
}

$that->close();
});

Expand All @@ -193,6 +200,9 @@ public function handleData($data)
$length = (int) $response->getHeaderLine('Content-Length');
}
$response = $response->withBody($body = new ReadableBodyStream($body, $length));
$body->on('end', function () use (&$successfulEndReceived) {
$successfulEndReceived = true;
});

// emit response with streaming response body (see `Sender`)
$this->emit('response', array($response, $body));
Expand Down Expand Up @@ -253,27 +263,28 @@ public function close()
$this->removeAllListeners();
}

protected function connect()
/**
* @internal
* @return bool
* @link https://www.rfc-editor.org/rfc/rfc9112#section-9.3
* @link https://www.rfc-editor.org/rfc/rfc7230#section-6.1
*/
public function hasMessageKeepAliveEnabled(MessageInterface $message)
{
$scheme = $this->request->getUri()->getScheme();
if ($scheme !== 'https' && $scheme !== 'http') {
return Promise\reject(
new \InvalidArgumentException('Invalid request URL given')
);
}
$connectionOptions = \RingCentral\Psr7\normalize_header(\strtolower($message->getHeaderLine('Connection')));

$host = $this->request->getUri()->getHost();
$port = $this->request->getUri()->getPort();
if (\in_array('close', $connectionOptions, true)) {
return false;
}

if ($scheme === 'https') {
$host = 'tls://' . $host;
if ($message->getProtocolVersion() === '1.1') {
return true;
}

if ($port === null) {
$port = $scheme === 'https' ? 443 : 80;
if (\in_array('keep-alive', $connectionOptions, true)) {
return true;
}

return $this->connector
->connect($host . ':' . $port);
return false;
}
}
14 changes: 6 additions & 8 deletions src/Io/Sender.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use React\Http\Client\Client as HttpClient;
use React\Promise\PromiseInterface;
use React\Promise\Deferred;
use React\Socket\Connector;
use React\Socket\ConnectorInterface;
use React\Stream\ReadableStreamInterface;

Expand Down Expand Up @@ -49,7 +50,11 @@ class Sender
*/
public static function createFromLoop(LoopInterface $loop, ConnectorInterface $connector = null)
{
return new self(new HttpClient($loop, $connector));
if ($connector === null) {
$connector = new Connector(array(), $loop);
}

return new self(new HttpClient(new ClientConnectionManager($connector, $loop)));
}

private $http;
Expand Down Expand Up @@ -93,13 +98,6 @@ public function send(RequestInterface $request)
$size = 0;
}

// automatically add `Connection: close` request header for HTTP/1.1 requests to avoid connection reuse
if ($request->getProtocolVersion() === '1.1') {
$request = $request->withHeader('Connection', 'close');
} else {
$request = $request->withoutHeader('Connection');
}

// automatically add `Authorization: Basic …` request header if URL includes `user:pass@host`
if ($request->getUri()->getUserInfo() !== '' && !$request->hasHeader('Authorization')) {
$request = $request->withHeader('Authorization', 'Basic ' . \base64_encode($request->getUri()->getUserInfo()));
Expand Down
2 changes: 1 addition & 1 deletion src/Io/Transaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ private function makeRedirectRequest(RequestInterface $request, UriInterface $lo
->withMethod($request->getMethod() === 'HEAD' ? 'HEAD' : 'GET')
->withoutHeader('Content-Type')
->withoutHeader('Content-Length')
->withBody(new EmptyBodyStream());
->withBody(new BufferedBody(''));
}

return $request;
Expand Down
Loading

0 comments on commit a221964

Please sign in to comment.