From 5c290647d821d54fbd3c794b6991242c004b4367 Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Fri, 17 Jan 2020 21:37:50 +0100 Subject: [PATCH 1/4] Await event listeners and informational response handler on HTTP/2 This makes the behavior more in sync with HTTP/1. --- src/Connection/Internal/Http2ConnectionProcessor.php | 9 +++++++-- src/Connection/Internal/Http2Stream.php | 6 +++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index 361d6090..a6c581ff 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -373,7 +373,7 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool return; } - asyncCall(function () use ($stream, $streamId) { + $stream->preResponseResolution = call(function () use ($stream, $streamId) { try { foreach ($stream->request->getEventListeners() as $eventListener) { yield $eventListener->startReceivingResponse($stream->request, $stream->stream); @@ -396,7 +396,9 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool $onInformationalResponse = $stream->request->getInformationalResponseHandler(); if ($onInformationalResponse !== null) { - asyncCall(function () use ($onInformationalResponse, $response, $streamId) { + $stream->preResponseResolution = call(function () use ($onInformationalResponse, $response, $stream, $streamId) { + yield $stream->preResponseResolution; + try { yield call($onInformationalResponse, $response); } catch (\Throwable $e) { @@ -429,6 +431,9 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool $stream->pendingResponse->resolve(call(static function () use ($response, $stream) { yield $stream->requestBodyCompletion->promise(); + yield $stream->preResponseResolution; + $stream->preResponseResolution = null; + $stream->pendingResponse = null; return $response; diff --git a/src/Connection/Internal/Http2Stream.php b/src/Connection/Internal/Http2Stream.php index f3021be1..adb4a96c 100644 --- a/src/Connection/Internal/Http2Stream.php +++ b/src/Connection/Internal/Http2Stream.php @@ -10,6 +10,7 @@ use Amp\Http\Client\Internal\ForbidSerialization; use Amp\Http\Client\Request; use Amp\Http\Client\Response; +use Amp\Promise; use Amp\Struct; /** @@ -32,9 +33,12 @@ final class Http2Stream /** @var Response|null */ public $response; - /** @var Deferred */ + /** @var Deferred|null */ public $pendingResponse; + /** @var Promise|null */ + public $preResponseResolution; + /** @var bool */ public $responsePending = true; From f59f9b3a17e0e169376134b0ea336de32e1174be Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Fri, 17 Jan 2020 21:39:32 +0100 Subject: [PATCH 2/4] Call startReceivingResponse only once for HTTP/2 It was called multiple times on informational responses. --- .../Internal/Http2ConnectionProcessor.php | 129 ++++++++++++++---- 1 file changed, 103 insertions(+), 26 deletions(-) diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index a6c581ff..7fb6bb9d 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -373,15 +373,21 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool return; } - $stream->preResponseResolution = call(function () use ($stream, $streamId) { - try { - foreach ($stream->request->getEventListeners() as $eventListener) { - yield $eventListener->startReceivingResponse($stream->request, $stream->stream); + if ($stream->preResponseResolution === null) { + $stream->preResponseResolution = call(function () use ($stream, $streamId) { + try { + foreach ($stream->request->getEventListeners() as $eventListener) { + yield $eventListener->startReceivingResponse($stream->request, $stream->stream); + } + } catch (\Throwable $e) { + $this->handleStreamException(new Http2StreamException( + "Event listener error", + $streamId, + Http2Parser::CANCEL + )); } - } catch (\Throwable $e) { - $this->handleStreamException(new Http2StreamException("Event listener error", $streamId, Http2Parser::CANCEL)); - } - }); + }); + } $response = new Response( '2', @@ -396,13 +402,22 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool $onInformationalResponse = $stream->request->getInformationalResponseHandler(); if ($onInformationalResponse !== null) { - $stream->preResponseResolution = call(function () use ($onInformationalResponse, $response, $stream, $streamId) { + $stream->preResponseResolution = call(function () use ( + $onInformationalResponse, + $response, + $stream, + $streamId + ) { yield $stream->preResponseResolution; try { yield call($onInformationalResponse, $response); } catch (\Throwable $e) { - $this->handleStreamException(new Http2StreamException('Informational response handler threw an exception', $streamId, self::CANCEL)); + $this->handleStreamException(new Http2StreamException( + 'Informational response handler threw an exception', + $streamId, + self::CANCEL + )); } }); } @@ -472,7 +487,12 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool return; } - $this->writeFrame(Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, $streamId, \pack("N", Http2Parser::CANCEL)); + $this->writeFrame( + Http2Parser::RST_STREAM, + Http2Parser::NO_FLAG, + $streamId, + \pack("N", Http2Parser::CANCEL) + ); $this->releaseStream($streamId, $exception); }); @@ -575,7 +595,10 @@ public function handlePushPromise(int $parentId, int $streamId, array $pseudo, a "query" => $query, ]); } catch (\Exception $exception) { - $this->handleConnectionException(new Http2ConnectionException("Invalid push URI", Http2Parser::PROTOCOL_ERROR)); + $this->handleConnectionException(new Http2ConnectionException( + "Invalid push URI", + Http2Parser::PROTOCOL_ERROR + )); return; } @@ -612,7 +635,11 @@ static function () { $stream->requestBodyCompletion->resolve(); if ($parentStream->request->getPushHandler() === null) { - $this->handleStreamException(new Http2StreamException("Push promise refused", $streamId, Http2Parser::CANCEL)); + $this->handleStreamException(new Http2StreamException( + "Push promise refused", + $streamId, + Http2Parser::CANCEL + )); return; } @@ -631,7 +658,12 @@ static function () { return; } - $this->writeFrame(Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, $streamId, \pack("N", Http2Parser::CANCEL)); + $this->writeFrame( + Http2Parser::RST_STREAM, + Http2Parser::NO_FLAG, + $streamId, + \pack("N", Http2Parser::CANCEL) + ); $this->releaseStream($streamId, $exception); }); @@ -691,7 +723,10 @@ public function handleStreamException(Http2StreamException $exception): void public function handleConnectionException(Http2ConnectionException $exception): void { - $this->shutdown(null, new ClientHttp2ConnectionException($exception->getMessage(), $exception->getCode(), $exception)); + $this->shutdown( + null, + new ClientHttp2ConnectionException($exception->getMessage(), $exception->getCode(), $exception) + ); } public function handleData(int $streamId, string $data): void @@ -719,7 +754,11 @@ public function handleData(int $streamId, string $data): void $stream->received += $length; if ($stream->received >= $stream->request->getBodySizeLimit()) { - $this->handleStreamException(new Http2StreamException("Body size limit exceeded", $streamId, Http2Parser::CANCEL)); + $this->handleStreamException(new Http2StreamException( + "Body size limit exceeded", + $streamId, + Http2Parser::CANCEL + )); return; } @@ -794,7 +833,11 @@ public function handleStreamEnd(int $streamId): void return new Trailers([]); } catch (\Throwable $e) { - $this->handleStreamException(new Http2StreamException("Event listener error", $streamId, Http2Parser::CANCEL)); + $this->handleStreamException(new Http2StreamException( + "Event listener error", + $streamId, + Http2Parser::CANCEL + )); throw $e; } @@ -889,7 +932,12 @@ public function request(Request $request, CancellationToken $cancellationToken, return; } - $this->writeFrame(Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, $streamId, \pack("N", Http2Parser::CANCEL)); + $this->writeFrame( + Http2Parser::RST_STREAM, + Http2Parser::NO_FLAG, + $streamId, + \pack("N", Http2Parser::CANCEL) + ); $this->releaseStream($streamId, $exception); }; @@ -1046,8 +1094,12 @@ private function run(): \Generator } } - private function writeFrame(int $type, int $flags = Http2Parser::NO_FLAG, int $stream = 0, string $data = ''): Promise - { + private function writeFrame( + int $type, + int $flags = Http2Parser::NO_FLAG, + int $stream = 0, + string $data = '' + ): Promise { \assert(Http2Parser::logDebugFrame('send', $type, $flags, $stream, \strlen($data))); /** @noinspection PhpUnhandledExceptionInspection */ @@ -1160,9 +1212,19 @@ private function writeBufferedData(Http2Stream $stream): Promise } if ($stream->requestBodyComplete) { - $promise = $this->writeFrame(Http2Parser::DATA, Http2Parser::END_STREAM, $stream->id, $stream->requestBodyBuffer); + $promise = $this->writeFrame( + Http2Parser::DATA, + Http2Parser::END_STREAM, + $stream->id, + $stream->requestBodyBuffer + ); } else { - $promise = $this->writeFrame(Http2Parser::DATA, Http2Parser::NO_FLAG, $stream->id, $stream->requestBodyBuffer); + $promise = $this->writeFrame( + Http2Parser::DATA, + Http2Parser::NO_FLAG, + $stream->id, + $stream->requestBodyBuffer + ); } $stream->requestBodyBuffer = ""; @@ -1185,7 +1247,12 @@ private function writeBufferedData(Http2Stream $stream): Promise $this->clientWindow -= $windowSize; for ($off = 0; $off < $end; $off += $this->frameSizeLimit) { - $this->writeFrame(Http2Parser::DATA, Http2Parser::NO_FLAG, $stream->id, \substr($data, $off, $this->frameSizeLimit)); + $this->writeFrame( + Http2Parser::DATA, + Http2Parser::NO_FLAG, + $stream->id, + \substr($data, $off, $this->frameSizeLimit) + ); } $promise = $this->writeFrame( @@ -1337,7 +1404,7 @@ private function ping(): Promise } /** - * @param int|null $lastId ID of last processed frame. Null to use the last opened frame ID or 0 if no + * @param int|null $lastId ID of last processed frame. Null to use the last opened frame ID or 0 if no * streams have been opened. * @param HttpException|null $reason * @@ -1352,7 +1419,12 @@ private function shutdown(?int $lastId = null, ?HttpException $reason = null): P return call(function () use ($lastId, $reason) { $code = $reason ? $reason->getCode() : Http2Parser::GRACEFUL_SHUTDOWN; $lastId = $lastId ?? ($this->streamId > 0 ? $this->streamId : 0); - $goawayPromise = $this->writeFrame(Http2Parser::GOAWAY, Http2Parser::NO_FLAG, 0, \pack("NN", $lastId, $code)); + $goawayPromise = $this->writeFrame( + Http2Parser::GOAWAY, + Http2Parser::NO_FLAG, + 0, + \pack("NN", $lastId, $code) + ); if ($this->settings !== null) { $settings = $this->settings; @@ -1459,7 +1531,12 @@ private function increaseStreamWindow(Http2Stream $stream): void } if ($increase > 0) { - $this->writeFrame(Http2Parser::WINDOW_UPDATE, Http2Parser::NO_FLAG, $stream->id, \pack("N", self::WINDOW_INCREMENT)); + $this->writeFrame( + Http2Parser::WINDOW_UPDATE, + Http2Parser::NO_FLAG, + $stream->id, + \pack("N", self::WINDOW_INCREMENT) + ); } } } From 54094e944646d19eb4066ded32bd892e36591a49 Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Fri, 17 Jan 2020 22:20:29 +0100 Subject: [PATCH 3/4] Invoke startReceivingResponse only if final response is started Users wishing to react to intermediate responses received can use the informational response handler API. --- src/Connection/Http1Connection.php | 13 +++---- .../Internal/Http2ConnectionProcessor.php | 34 +++++++++---------- 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/src/Connection/Http1Connection.php b/src/Connection/Http1Connection.php index 6afa7eb6..75994ada 100644 --- a/src/Connection/Http1Connection.php +++ b/src/Connection/Http1Connection.php @@ -279,18 +279,9 @@ private function readResponse( $parser = new Http1Parser($request, $bodyCallback, $trailersCallback); $start = getCurrentTime(); - $firstRead = true; try { while (null !== $chunk = yield $this->socket->read()) { - if ($firstRead) { - foreach ($request->getEventListeners() as $eventListener) { - yield $eventListener->startReceivingResponse($request, $stream); - } - - $firstRead = false; - } - parseChunk: $response = $parser->parse($chunk); if ($response === null) { @@ -334,6 +325,10 @@ private function readResponse( goto parseChunk; } + foreach ($request->getEventListeners() as $eventListener) { + yield $eventListener->startReceivingResponse($request, $stream); + } + if ($status >= 200 && $status < 300 && $request->getMethod() === 'CONNECT') { foreach ($request->getEventListeners() as $eventListener) { yield $eventListener->completeReceivingResponse($request, $stream); diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index 7fb6bb9d..f8bf3ae8 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -373,22 +373,6 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool return; } - if ($stream->preResponseResolution === null) { - $stream->preResponseResolution = call(function () use ($stream, $streamId) { - try { - foreach ($stream->request->getEventListeners() as $eventListener) { - yield $eventListener->startReceivingResponse($stream->request, $stream->stream); - } - } catch (\Throwable $e) { - $this->handleStreamException(new Http2StreamException( - "Event listener error", - $streamId, - Http2Parser::CANCEL - )); - } - }); - } - $response = new Response( '2', $status, @@ -416,7 +400,7 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool $this->handleStreamException(new Http2StreamException( 'Informational response handler threw an exception', $streamId, - self::CANCEL + Http2Parser::CANCEL )); } }); @@ -425,6 +409,22 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool return; } + \assert($stream->preResponseResolution); + + $stream->preResponseResolution = call(function () use ($stream, $streamId) { + try { + foreach ($stream->request->getEventListeners() as $eventListener) { + yield $eventListener->startReceivingResponse($stream->request, $stream->stream); + } + } catch (\Throwable $e) { + $this->handleStreamException(new Http2StreamException( + "Event listener error", + $streamId, + Http2Parser::CANCEL + )); + } + }); + $stream->body = new Emitter; $stream->trailers = new Deferred; From a694372f7eebb16ec88681e723cbee8422772ed6 Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Fri, 17 Jan 2020 22:40:33 +0100 Subject: [PATCH 4/4] Fix assertion --- src/Connection/Internal/Http2ConnectionProcessor.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index f8bf3ae8..6b02b195 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -409,7 +409,7 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool return; } - \assert($stream->preResponseResolution); + \assert($stream->preResponseResolution === null); $stream->preResponseResolution = call(function () use ($stream, $streamId) { try {