Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Await events @ HTTP/2 #254

Merged
merged 4 commits into from
Jan 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions src/Connection/Http1Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -279,18 +279,9 @@ private function readResponse(
$parser = new Http1Parser($request, $bodyCallback, $trailersCallback);

$start = getCurrentTime();
$firstRead = true;

try {
while (null !== $chunk = yield $this->socket->read()) {
if ($firstRead) {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startReceivingResponse($request, $stream);
}

$firstRead = false;
}

parseChunk:
$response = $parser->parse($chunk);
if ($response === null) {
Expand Down Expand Up @@ -334,6 +325,10 @@ private function readResponse(
goto parseChunk;
}

foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startReceivingResponse($request, $stream);
}

if ($status >= 200 && $status < 300 && $request->getMethod() === 'CONNECT') {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->completeReceivingResponse($request, $stream);
Expand Down
138 changes: 110 additions & 28 deletions src/Connection/Internal/Http2ConnectionProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -373,16 +373,6 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool
return;
}

asyncCall(function () use ($stream, $streamId) {
try {
foreach ($stream->request->getEventListeners() as $eventListener) {
yield $eventListener->startReceivingResponse($stream->request, $stream->stream);
}
} catch (\Throwable $e) {
$this->handleStreamException(new Http2StreamException("Event listener error", $streamId, Http2Parser::CANCEL));
}
});

$response = new Response(
'2',
$status,
Expand All @@ -396,18 +386,45 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool
$onInformationalResponse = $stream->request->getInformationalResponseHandler();

if ($onInformationalResponse !== null) {
asyncCall(function () use ($onInformationalResponse, $response, $streamId) {
$stream->preResponseResolution = call(function () use (
$onInformationalResponse,
$response,
$stream,
$streamId
) {
yield $stream->preResponseResolution;

try {
yield call($onInformationalResponse, $response);
} catch (\Throwable $e) {
$this->handleStreamException(new Http2StreamException('Informational response handler threw an exception', $streamId, self::CANCEL));
$this->handleStreamException(new Http2StreamException(
'Informational response handler threw an exception',
$streamId,
Http2Parser::CANCEL
));
}
});
}

return;
}

\assert($stream->preResponseResolution === null);

$stream->preResponseResolution = call(function () use ($stream, $streamId) {
try {
foreach ($stream->request->getEventListeners() as $eventListener) {
yield $eventListener->startReceivingResponse($stream->request, $stream->stream);
}
} catch (\Throwable $e) {
$this->handleStreamException(new Http2StreamException(
"Event listener error",
$streamId,
Http2Parser::CANCEL
));
}
});

$stream->body = new Emitter;
$stream->trailers = new Deferred;

Expand All @@ -429,6 +446,9 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool
$stream->pendingResponse->resolve(call(static function () use ($response, $stream) {
yield $stream->requestBodyCompletion->promise();

yield $stream->preResponseResolution;
$stream->preResponseResolution = null;

$stream->pendingResponse = null;

return $response;
Expand Down Expand Up @@ -467,7 +487,12 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool
return;
}

$this->writeFrame(Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, $streamId, \pack("N", Http2Parser::CANCEL));
$this->writeFrame(
Http2Parser::RST_STREAM,
Http2Parser::NO_FLAG,
$streamId,
\pack("N", Http2Parser::CANCEL)
);
$this->releaseStream($streamId, $exception);
});

Expand Down Expand Up @@ -570,7 +595,10 @@ public function handlePushPromise(int $parentId, int $streamId, array $pseudo, a
"query" => $query,
]);
} catch (\Exception $exception) {
$this->handleConnectionException(new Http2ConnectionException("Invalid push URI", Http2Parser::PROTOCOL_ERROR));
$this->handleConnectionException(new Http2ConnectionException(
"Invalid push URI",
Http2Parser::PROTOCOL_ERROR
));

return;
}
Expand Down Expand Up @@ -607,7 +635,11 @@ static function () {
$stream->requestBodyCompletion->resolve();

if ($parentStream->request->getPushHandler() === null) {
$this->handleStreamException(new Http2StreamException("Push promise refused", $streamId, Http2Parser::CANCEL));
$this->handleStreamException(new Http2StreamException(
"Push promise refused",
$streamId,
Http2Parser::CANCEL
));

return;
}
Expand All @@ -626,7 +658,12 @@ static function () {
return;
}

$this->writeFrame(Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, $streamId, \pack("N", Http2Parser::CANCEL));
$this->writeFrame(
Http2Parser::RST_STREAM,
Http2Parser::NO_FLAG,
$streamId,
\pack("N", Http2Parser::CANCEL)
);
$this->releaseStream($streamId, $exception);
});

Expand Down Expand Up @@ -686,7 +723,10 @@ public function handleStreamException(Http2StreamException $exception): void

public function handleConnectionException(Http2ConnectionException $exception): void
{
$this->shutdown(null, new ClientHttp2ConnectionException($exception->getMessage(), $exception->getCode(), $exception));
$this->shutdown(
null,
new ClientHttp2ConnectionException($exception->getMessage(), $exception->getCode(), $exception)
);
}

public function handleData(int $streamId, string $data): void
Expand Down Expand Up @@ -714,7 +754,11 @@ public function handleData(int $streamId, string $data): void
$stream->received += $length;

if ($stream->received >= $stream->request->getBodySizeLimit()) {
$this->handleStreamException(new Http2StreamException("Body size limit exceeded", $streamId, Http2Parser::CANCEL));
$this->handleStreamException(new Http2StreamException(
"Body size limit exceeded",
$streamId,
Http2Parser::CANCEL
));

return;
}
Expand Down Expand Up @@ -789,7 +833,11 @@ public function handleStreamEnd(int $streamId): void

return new Trailers([]);
} catch (\Throwable $e) {
$this->handleStreamException(new Http2StreamException("Event listener error", $streamId, Http2Parser::CANCEL));
$this->handleStreamException(new Http2StreamException(
"Event listener error",
$streamId,
Http2Parser::CANCEL
));

throw $e;
}
Expand Down Expand Up @@ -884,7 +932,12 @@ public function request(Request $request, CancellationToken $cancellationToken,
return;
}

$this->writeFrame(Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, $streamId, \pack("N", Http2Parser::CANCEL));
$this->writeFrame(
Http2Parser::RST_STREAM,
Http2Parser::NO_FLAG,
$streamId,
\pack("N", Http2Parser::CANCEL)
);
$this->releaseStream($streamId, $exception);
};

Expand Down Expand Up @@ -1041,8 +1094,12 @@ private function run(): \Generator
}
}

private function writeFrame(int $type, int $flags = Http2Parser::NO_FLAG, int $stream = 0, string $data = ''): Promise
{
private function writeFrame(
int $type,
int $flags = Http2Parser::NO_FLAG,
int $stream = 0,
string $data = ''
): Promise {
\assert(Http2Parser::logDebugFrame('send', $type, $flags, $stream, \strlen($data)));

/** @noinspection PhpUnhandledExceptionInspection */
Expand Down Expand Up @@ -1155,9 +1212,19 @@ private function writeBufferedData(Http2Stream $stream): Promise
}

if ($stream->requestBodyComplete) {
$promise = $this->writeFrame(Http2Parser::DATA, Http2Parser::END_STREAM, $stream->id, $stream->requestBodyBuffer);
$promise = $this->writeFrame(
Http2Parser::DATA,
Http2Parser::END_STREAM,
$stream->id,
$stream->requestBodyBuffer
);
} else {
$promise = $this->writeFrame(Http2Parser::DATA, Http2Parser::NO_FLAG, $stream->id, $stream->requestBodyBuffer);
$promise = $this->writeFrame(
Http2Parser::DATA,
Http2Parser::NO_FLAG,
$stream->id,
$stream->requestBodyBuffer
);
}

$stream->requestBodyBuffer = "";
Expand All @@ -1180,7 +1247,12 @@ private function writeBufferedData(Http2Stream $stream): Promise
$this->clientWindow -= $windowSize;

for ($off = 0; $off < $end; $off += $this->frameSizeLimit) {
$this->writeFrame(Http2Parser::DATA, Http2Parser::NO_FLAG, $stream->id, \substr($data, $off, $this->frameSizeLimit));
$this->writeFrame(
Http2Parser::DATA,
Http2Parser::NO_FLAG,
$stream->id,
\substr($data, $off, $this->frameSizeLimit)
);
}

$promise = $this->writeFrame(
Expand Down Expand Up @@ -1332,7 +1404,7 @@ private function ping(): Promise
}

/**
* @param int|null $lastId ID of last processed frame. Null to use the last opened frame ID or 0 if no
* @param int|null $lastId ID of last processed frame. Null to use the last opened frame ID or 0 if no
* streams have been opened.
* @param HttpException|null $reason
*
Expand All @@ -1347,7 +1419,12 @@ private function shutdown(?int $lastId = null, ?HttpException $reason = null): P
return call(function () use ($lastId, $reason) {
$code = $reason ? $reason->getCode() : Http2Parser::GRACEFUL_SHUTDOWN;
$lastId = $lastId ?? ($this->streamId > 0 ? $this->streamId : 0);
$goawayPromise = $this->writeFrame(Http2Parser::GOAWAY, Http2Parser::NO_FLAG, 0, \pack("NN", $lastId, $code));
$goawayPromise = $this->writeFrame(
Http2Parser::GOAWAY,
Http2Parser::NO_FLAG,
0,
\pack("NN", $lastId, $code)
);

if ($this->settings !== null) {
$settings = $this->settings;
Expand Down Expand Up @@ -1454,7 +1531,12 @@ private function increaseStreamWindow(Http2Stream $stream): void
}

if ($increase > 0) {
$this->writeFrame(Http2Parser::WINDOW_UPDATE, Http2Parser::NO_FLAG, $stream->id, \pack("N", self::WINDOW_INCREMENT));
$this->writeFrame(
Http2Parser::WINDOW_UPDATE,
Http2Parser::NO_FLAG,
$stream->id,
\pack("N", self::WINDOW_INCREMENT)
);
}
}
}
6 changes: 5 additions & 1 deletion src/Connection/Internal/Http2Stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Amp\Http\Client\Internal\ForbidSerialization;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\Promise;
use Amp\Struct;

/**
Expand All @@ -32,9 +33,12 @@ final class Http2Stream
/** @var Response|null */
public $response;

/** @var Deferred */
/** @var Deferred|null */
public $pendingResponse;

/** @var Promise|null */
public $preResponseResolution;

/** @var bool */
public $responsePending = true;

Expand Down