-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feature #35115 [HttpClient] Add portable HTTP/2 implementation based …
…on Amp's HTTP client (nicolas-grekas) This PR was merged into the 5.1-dev branch. Discussion ---------- [HttpClient] Add portable HTTP/2 implementation based on Amp's HTTP client | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | - | License | MIT | Doc PR | - This PR provides an `AmpHttpClient`, which is an adapter between [`amphp/http-client`](https://github.com/amphp/http-client) and `symfony/http-client-contracts`. ~This is an early experiment for now, but it works already on the happy path:~ I have a local h2-intensive script, and while it's slower than CurlHttpClient, this performs quite well! This could provide a portable implementation of HTTP/2 \o/ /cc @kelunik FYI Todo: - [x] async request/response - [x] streaming and multiplexing - [x] handle all ssl options - [x] timers info - [x] upload/download progress info - [x] upload/download progress callback - [x] HTTP proxy support - [x] streamed upload - [x] public-key pinning - [x] peer certificate capturing - [x] stream casting with `$response->toStream()` - [x] ~amphp/http-client#241 - [x] extensive debug info - [x] HTTP/2 PUSH support - [x] amphp/http-client#243 - [x] amphp/http-client#242 - [x] amphp/http-client#250 - [x] amphp/http-client#239 - [x] ~kelunik/certificate#2 - [x] amphp/socket#71 - [x] amphp/http-client#252 Commits ------- ef113feeb3 [HttpClient] Add portable HTTP/2 implementation based on Amp's HTTP client
- Loading branch information
Showing
17 changed files
with
1,408 additions
and
188 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\HttpClient; | ||
|
||
use Amp\CancelledException; | ||
use Amp\Http\Client\DelegateHttpClient; | ||
use Amp\Http\Client\InterceptedHttpClient; | ||
use Amp\Http\Client\PooledHttpClient; | ||
use Amp\Http\Client\Request; | ||
use Amp\Http\Tunnel\Http1TunnelConnector; | ||
use Psr\Log\LoggerAwareInterface; | ||
use Psr\Log\LoggerAwareTrait; | ||
use Symfony\Component\HttpClient\Exception\TransportException; | ||
use Symfony\Component\HttpClient\Internal\AmpClientState; | ||
use Symfony\Component\HttpClient\Response\AmpResponse; | ||
use Symfony\Component\HttpClient\Response\ResponseStream; | ||
use Symfony\Contracts\HttpClient\HttpClientInterface; | ||
use Symfony\Contracts\HttpClient\ResponseInterface; | ||
use Symfony\Contracts\HttpClient\ResponseStreamInterface; | ||
use Symfony\Contracts\Service\ResetInterface; | ||
|
||
if (!interface_exists(DelegateHttpClient::class)) { | ||
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\AmpHttpClient" as the "amphp/http-client" package is not installed. Try running "composer require amphp/http-client".'); | ||
} | ||
|
||
/** | ||
* A portable implementation of the HttpClientInterface contracts based on Amp's HTTP client. | ||
* | ||
* @author Nicolas Grekas <p@tchwork.com> | ||
*/ | ||
final class AmpHttpClient implements HttpClientInterface, LoggerAwareInterface, ResetInterface | ||
{ | ||
use HttpClientTrait; | ||
use LoggerAwareTrait; | ||
|
||
private $defaultOptions = self::OPTIONS_DEFAULTS; | ||
|
||
/** @var AmpClientState */ | ||
private $multi; | ||
|
||
/** | ||
* @param array $defaultOptions Default requests' options | ||
* @param callable $clientConfigurator A callable that builds a {@see DelegateHttpClient} from a {@see PooledHttpClient}; | ||
* passing null builds an {@see InterceptedHttpClient} with 2 retries on failures | ||
* @param int $maxHostConnections The maximum number of connections to a single host | ||
* @param int $maxPendingPushes The maximum number of pushed responses to accept in the queue | ||
* | ||
* @see HttpClientInterface::OPTIONS_DEFAULTS for available options | ||
*/ | ||
public function __construct(array $defaultOptions = [], callable $clientConfigurator = null, int $maxHostConnections = 6, int $maxPendingPushes = 50) | ||
{ | ||
$this->defaultOptions['buffer'] = $this->defaultOptions['buffer'] ?? \Closure::fromCallable([__CLASS__, 'shouldBuffer']); | ||
|
||
if ($defaultOptions) { | ||
[, $this->defaultOptions] = self::prepareRequest(null, null, $defaultOptions, $this->defaultOptions); | ||
} | ||
|
||
$this->multi = new AmpClientState($clientConfigurator, $maxHostConnections, $maxPendingPushes, $this->logger); | ||
} | ||
|
||
/** | ||
* @see HttpClientInterface::OPTIONS_DEFAULTS for available options | ||
* | ||
* {@inheritdoc} | ||
*/ | ||
public function request(string $method, string $url, array $options = []): ResponseInterface | ||
{ | ||
[$url, $options] = self::prepareRequest($method, $url, $options, $this->defaultOptions); | ||
|
||
$options['proxy'] = self::getProxy($options['proxy'], $url, $options['no_proxy']); | ||
|
||
if (null !== $options['proxy'] && !class_exists(Http1TunnelConnector::class)) { | ||
throw new \LogicException('You cannot use the "proxy" option as the "amphp/http-tunnel" package is not installed. Try running "composer require amphp/http-tunnel".'); | ||
} | ||
|
||
if ('' !== $options['body'] && 'POST' === $method && !isset($options['normalized_headers']['content-type'])) { | ||
$options['headers'][] = 'Content-Type: application/x-www-form-urlencoded'; | ||
} | ||
|
||
if (!isset($options['normalized_headers']['user-agent'])) { | ||
$options['headers'][] = 'User-Agent: Symfony HttpClient/Amp'; | ||
} | ||
|
||
if (0 < $options['max_duration']) { | ||
$options['timeout'] = min($options['max_duration'], $options['timeout']); | ||
} | ||
|
||
if ($options['resolve']) { | ||
$this->multi->dnsCache = $options['resolve'] + $this->multi->dnsCache; | ||
} | ||
|
||
if ($options['peer_fingerprint'] && !isset($options['peer_fingerprint']['pin-sha256'])) { | ||
throw new TransportException(__CLASS__.' supports only "pin-sha256" fingerprints.'); | ||
} | ||
|
||
$request = new Request(implode('', $url), $method); | ||
|
||
if ($options['http_version']) { | ||
switch ((float) $options['http_version']) { | ||
case 1.0: $request->setProtocolVersions(['1.0']); break; | ||
case 1.1: $request->setProtocolVersions(['1.1', '1.0']); break; | ||
default: $request->setProtocolVersions(['2', '1.1', '1.0']); break; | ||
} | ||
} | ||
|
||
foreach ($options['headers'] as $v) { | ||
$h = explode(': ', $v, 2); | ||
$request->addHeader($h[0], $h[1]); | ||
} | ||
|
||
$request->setTcpConnectTimeout(1000 * $options['timeout']); | ||
$request->setTlsHandshakeTimeout(1000 * $options['timeout']); | ||
$request->setTransferTimeout(1000 * $options['max_duration']); | ||
|
||
if ('' !== $request->getUri()->getUserInfo() && !$request->hasHeader('authorization')) { | ||
$auth = explode(':', $request->getUri()->getUserInfo(), 2); | ||
$auth = array_map('rawurldecode', $auth) + [1 => '']; | ||
$request->setHeader('Authorization', 'Basic '.base64_encode(implode(':', $auth))); | ||
} | ||
|
||
return new AmpResponse($this->multi, $request, $options, $this->logger); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function stream($responses, float $timeout = null): ResponseStreamInterface | ||
{ | ||
if ($responses instanceof AmpResponse) { | ||
$responses = [$responses]; | ||
} elseif (!is_iterable($responses)) { | ||
throw new \TypeError(sprintf('%s() expects parameter 1 to be an iterable of AmpResponse objects, %s given.', __METHOD__, \is_object($responses) ? \get_class($responses) : \gettype($responses))); | ||
} | ||
|
||
return new ResponseStream(AmpResponse::stream($responses, $timeout)); | ||
} | ||
|
||
public function reset() | ||
{ | ||
$this->multi->dnsCache = []; | ||
|
||
foreach ($this->multi->pushedResponses as $authority => $pushedResponses) { | ||
foreach ($pushedResponses as [$pushedUrl, $pushDeferred]) { | ||
$pushDeferred->fail(new CancelledException()); | ||
|
||
if ($this->logger) { | ||
$this->logger->debug(sprintf('Unused pushed response: "%s"', $pushedUrl)); | ||
} | ||
} | ||
} | ||
|
||
$this->multi->pushedResponses = []; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
<?php | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\HttpClient\Internal; | ||
|
||
use Amp\ByteStream\InputStream; | ||
use Amp\ByteStream\ResourceInputStream; | ||
use Amp\Http\Client\RequestBody; | ||
use Amp\Promise; | ||
use Amp\Success; | ||
use Symfony\Component\HttpClient\Exception\TransportException; | ||
|
||
/** | ||
* @author Nicolas Grekas <p@tchwork.com> | ||
* | ||
* @internal | ||
*/ | ||
class AmpBody implements RequestBody, InputStream | ||
{ | ||
private $body; | ||
private $onProgress; | ||
private $offset = 0; | ||
private $length = -1; | ||
private $uploaded; | ||
|
||
public function __construct($body, &$info, \Closure $onProgress) | ||
{ | ||
$this->body = $body; | ||
$this->info = &$info; | ||
$this->onProgress = $onProgress; | ||
|
||
if (\is_resource($body)) { | ||
$this->offset = ftell($body); | ||
$this->length = fstat($body)['size']; | ||
$this->body = new ResourceInputStream($body); | ||
} elseif (\is_string($body)) { | ||
$this->length = \strlen($body); | ||
} | ||
} | ||
|
||
public function createBodyStream(): InputStream | ||
{ | ||
if (null !== $this->uploaded) { | ||
$this->uploaded = null; | ||
|
||
if (\is_string($this->body)) { | ||
$this->offset = 0; | ||
} elseif ($this->body instanceof ResourceInputStream) { | ||
fseek($this->body->getResource(), $this->offset); | ||
} | ||
} | ||
|
||
return $this; | ||
} | ||
|
||
public function getHeaders(): Promise | ||
{ | ||
return new Success([]); | ||
} | ||
|
||
public function getBodyLength(): Promise | ||
{ | ||
return new Success($this->length - $this->offset); | ||
} | ||
|
||
public function read(): Promise | ||
{ | ||
$this->info['size_upload'] += $this->uploaded; | ||
$this->uploaded = 0; | ||
($this->onProgress)(); | ||
|
||
$chunk = $this->doRead(); | ||
$chunk->onResolve(function ($e, $data) { | ||
if (null !== $data) { | ||
$this->uploaded = \strlen($data); | ||
} else { | ||
$this->info['upload_content_length'] = $this->info['size_upload']; | ||
} | ||
}); | ||
|
||
return $chunk; | ||
} | ||
|
||
public static function rewind(RequestBody $body): RequestBody | ||
{ | ||
if (!$body instanceof self) { | ||
return $body; | ||
} | ||
|
||
$body->uploaded = null; | ||
|
||
if ($body->body instanceof ResourceInputStream) { | ||
fseek($body->body->getResource(), $body->offset); | ||
|
||
return new $body($body->body, $body->info, $body->onProgress); | ||
} | ||
|
||
if (\is_string($body->body)) { | ||
$body->offset = 0; | ||
} | ||
|
||
return $body; | ||
} | ||
|
||
private function doRead(): Promise | ||
{ | ||
if ($this->body instanceof ResourceInputStream) { | ||
return $this->body->read(); | ||
} | ||
|
||
if (null === $this->offset || !$this->length) { | ||
return new Success(); | ||
} | ||
|
||
if (\is_string($this->body)) { | ||
$this->offset = null; | ||
|
||
return new Success($this->body); | ||
} | ||
|
||
if ('' === $data = ($this->body)(16372)) { | ||
$this->offset = null; | ||
|
||
return new Success(); | ||
} | ||
|
||
if (!\is_string($data)) { | ||
throw new TransportException(sprintf('Return value of the "body" option callback must be string, %s returned.', \gettype($data))); | ||
} | ||
|
||
return new Success($data); | ||
} | ||
} |
Oops, something went wrong.