diff --git a/src/Connection/Http1Connection.php b/src/Connection/Http1Connection.php index 6afa7eb6..75994ada 100644 --- a/src/Connection/Http1Connection.php +++ b/src/Connection/Http1Connection.php @@ -279,18 +279,9 @@ private function readResponse( $parser = new Http1Parser($request, $bodyCallback, $trailersCallback); $start = getCurrentTime(); - $firstRead = true; try { while (null !== $chunk = yield $this->socket->read()) { - if ($firstRead) { - foreach ($request->getEventListeners() as $eventListener) { - yield $eventListener->startReceivingResponse($request, $stream); - } - - $firstRead = false; - } - parseChunk: $response = $parser->parse($chunk); if ($response === null) { @@ -334,6 +325,10 @@ private function readResponse( goto parseChunk; } + foreach ($request->getEventListeners() as $eventListener) { + yield $eventListener->startReceivingResponse($request, $stream); + } + if ($status >= 200 && $status < 300 && $request->getMethod() === 'CONNECT') { foreach ($request->getEventListeners() as $eventListener) { yield $eventListener->completeReceivingResponse($request, $stream); diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index 361d6090..6b02b195 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -373,16 +373,6 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool return; } - asyncCall(function () use ($stream, $streamId) { - try { - foreach ($stream->request->getEventListeners() as $eventListener) { - yield $eventListener->startReceivingResponse($stream->request, $stream->stream); - } - } catch (\Throwable $e) { - $this->handleStreamException(new Http2StreamException("Event listener error", $streamId, Http2Parser::CANCEL)); - } - }); - $response = new Response( '2', $status, @@ -396,11 +386,22 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool $onInformationalResponse = $stream->request->getInformationalResponseHandler(); if ($onInformationalResponse !== null) { - asyncCall(function () use ($onInformationalResponse, $response, $streamId) { + $stream->preResponseResolution = call(function () use ( + $onInformationalResponse, + $response, + $stream, + $streamId + ) { + yield $stream->preResponseResolution; + try { yield call($onInformationalResponse, $response); } catch (\Throwable $e) { - $this->handleStreamException(new Http2StreamException('Informational response handler threw an exception', $streamId, self::CANCEL)); + $this->handleStreamException(new Http2StreamException( + 'Informational response handler threw an exception', + $streamId, + Http2Parser::CANCEL + )); } }); } @@ -408,6 +409,22 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool return; } + \assert($stream->preResponseResolution === null); + + $stream->preResponseResolution = call(function () use ($stream, $streamId) { + try { + foreach ($stream->request->getEventListeners() as $eventListener) { + yield $eventListener->startReceivingResponse($stream->request, $stream->stream); + } + } catch (\Throwable $e) { + $this->handleStreamException(new Http2StreamException( + "Event listener error", + $streamId, + Http2Parser::CANCEL + )); + } + }); + $stream->body = new Emitter; $stream->trailers = new Deferred; @@ -429,6 +446,9 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool $stream->pendingResponse->resolve(call(static function () use ($response, $stream) { yield $stream->requestBodyCompletion->promise(); + yield $stream->preResponseResolution; + $stream->preResponseResolution = null; + $stream->pendingResponse = null; return $response; @@ -467,7 +487,12 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool return; } - $this->writeFrame(Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, $streamId, \pack("N", Http2Parser::CANCEL)); + $this->writeFrame( + Http2Parser::RST_STREAM, + Http2Parser::NO_FLAG, + $streamId, + \pack("N", Http2Parser::CANCEL) + ); $this->releaseStream($streamId, $exception); }); @@ -570,7 +595,10 @@ public function handlePushPromise(int $parentId, int $streamId, array $pseudo, a "query" => $query, ]); } catch (\Exception $exception) { - $this->handleConnectionException(new Http2ConnectionException("Invalid push URI", Http2Parser::PROTOCOL_ERROR)); + $this->handleConnectionException(new Http2ConnectionException( + "Invalid push URI", + Http2Parser::PROTOCOL_ERROR + )); return; } @@ -607,7 +635,11 @@ static function () { $stream->requestBodyCompletion->resolve(); if ($parentStream->request->getPushHandler() === null) { - $this->handleStreamException(new Http2StreamException("Push promise refused", $streamId, Http2Parser::CANCEL)); + $this->handleStreamException(new Http2StreamException( + "Push promise refused", + $streamId, + Http2Parser::CANCEL + )); return; } @@ -626,7 +658,12 @@ static function () { return; } - $this->writeFrame(Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, $streamId, \pack("N", Http2Parser::CANCEL)); + $this->writeFrame( + Http2Parser::RST_STREAM, + Http2Parser::NO_FLAG, + $streamId, + \pack("N", Http2Parser::CANCEL) + ); $this->releaseStream($streamId, $exception); }); @@ -686,7 +723,10 @@ public function handleStreamException(Http2StreamException $exception): void public function handleConnectionException(Http2ConnectionException $exception): void { - $this->shutdown(null, new ClientHttp2ConnectionException($exception->getMessage(), $exception->getCode(), $exception)); + $this->shutdown( + null, + new ClientHttp2ConnectionException($exception->getMessage(), $exception->getCode(), $exception) + ); } public function handleData(int $streamId, string $data): void @@ -714,7 +754,11 @@ public function handleData(int $streamId, string $data): void $stream->received += $length; if ($stream->received >= $stream->request->getBodySizeLimit()) { - $this->handleStreamException(new Http2StreamException("Body size limit exceeded", $streamId, Http2Parser::CANCEL)); + $this->handleStreamException(new Http2StreamException( + "Body size limit exceeded", + $streamId, + Http2Parser::CANCEL + )); return; } @@ -789,7 +833,11 @@ public function handleStreamEnd(int $streamId): void return new Trailers([]); } catch (\Throwable $e) { - $this->handleStreamException(new Http2StreamException("Event listener error", $streamId, Http2Parser::CANCEL)); + $this->handleStreamException(new Http2StreamException( + "Event listener error", + $streamId, + Http2Parser::CANCEL + )); throw $e; } @@ -884,7 +932,12 @@ public function request(Request $request, CancellationToken $cancellationToken, return; } - $this->writeFrame(Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, $streamId, \pack("N", Http2Parser::CANCEL)); + $this->writeFrame( + Http2Parser::RST_STREAM, + Http2Parser::NO_FLAG, + $streamId, + \pack("N", Http2Parser::CANCEL) + ); $this->releaseStream($streamId, $exception); }; @@ -1041,8 +1094,12 @@ private function run(): \Generator } } - private function writeFrame(int $type, int $flags = Http2Parser::NO_FLAG, int $stream = 0, string $data = ''): Promise - { + private function writeFrame( + int $type, + int $flags = Http2Parser::NO_FLAG, + int $stream = 0, + string $data = '' + ): Promise { \assert(Http2Parser::logDebugFrame('send', $type, $flags, $stream, \strlen($data))); /** @noinspection PhpUnhandledExceptionInspection */ @@ -1155,9 +1212,19 @@ private function writeBufferedData(Http2Stream $stream): Promise } if ($stream->requestBodyComplete) { - $promise = $this->writeFrame(Http2Parser::DATA, Http2Parser::END_STREAM, $stream->id, $stream->requestBodyBuffer); + $promise = $this->writeFrame( + Http2Parser::DATA, + Http2Parser::END_STREAM, + $stream->id, + $stream->requestBodyBuffer + ); } else { - $promise = $this->writeFrame(Http2Parser::DATA, Http2Parser::NO_FLAG, $stream->id, $stream->requestBodyBuffer); + $promise = $this->writeFrame( + Http2Parser::DATA, + Http2Parser::NO_FLAG, + $stream->id, + $stream->requestBodyBuffer + ); } $stream->requestBodyBuffer = ""; @@ -1180,7 +1247,12 @@ private function writeBufferedData(Http2Stream $stream): Promise $this->clientWindow -= $windowSize; for ($off = 0; $off < $end; $off += $this->frameSizeLimit) { - $this->writeFrame(Http2Parser::DATA, Http2Parser::NO_FLAG, $stream->id, \substr($data, $off, $this->frameSizeLimit)); + $this->writeFrame( + Http2Parser::DATA, + Http2Parser::NO_FLAG, + $stream->id, + \substr($data, $off, $this->frameSizeLimit) + ); } $promise = $this->writeFrame( @@ -1332,7 +1404,7 @@ private function ping(): Promise } /** - * @param int|null $lastId ID of last processed frame. Null to use the last opened frame ID or 0 if no + * @param int|null $lastId ID of last processed frame. Null to use the last opened frame ID or 0 if no * streams have been opened. * @param HttpException|null $reason * @@ -1347,7 +1419,12 @@ private function shutdown(?int $lastId = null, ?HttpException $reason = null): P return call(function () use ($lastId, $reason) { $code = $reason ? $reason->getCode() : Http2Parser::GRACEFUL_SHUTDOWN; $lastId = $lastId ?? ($this->streamId > 0 ? $this->streamId : 0); - $goawayPromise = $this->writeFrame(Http2Parser::GOAWAY, Http2Parser::NO_FLAG, 0, \pack("NN", $lastId, $code)); + $goawayPromise = $this->writeFrame( + Http2Parser::GOAWAY, + Http2Parser::NO_FLAG, + 0, + \pack("NN", $lastId, $code) + ); if ($this->settings !== null) { $settings = $this->settings; @@ -1454,7 +1531,12 @@ private function increaseStreamWindow(Http2Stream $stream): void } if ($increase > 0) { - $this->writeFrame(Http2Parser::WINDOW_UPDATE, Http2Parser::NO_FLAG, $stream->id, \pack("N", self::WINDOW_INCREMENT)); + $this->writeFrame( + Http2Parser::WINDOW_UPDATE, + Http2Parser::NO_FLAG, + $stream->id, + \pack("N", self::WINDOW_INCREMENT) + ); } } } diff --git a/src/Connection/Internal/Http2Stream.php b/src/Connection/Internal/Http2Stream.php index f3021be1..adb4a96c 100644 --- a/src/Connection/Internal/Http2Stream.php +++ b/src/Connection/Internal/Http2Stream.php @@ -10,6 +10,7 @@ use Amp\Http\Client\Internal\ForbidSerialization; use Amp\Http\Client\Request; use Amp\Http\Client\Response; +use Amp\Promise; use Amp\Struct; /** @@ -32,9 +33,12 @@ final class Http2Stream /** @var Response|null */ public $response; - /** @var Deferred */ + /** @var Deferred|null */ public $pendingResponse; + /** @var Promise|null */ + public $preResponseResolution; + /** @var bool */ public $responsePending = true;