diff --git a/src/Driver/Http1Driver.php b/src/Driver/Http1Driver.php index caf11c81..d7624ac5 100644 --- a/src/Driver/Http1Driver.php +++ b/src/Driver/Http1Driver.php @@ -110,7 +110,7 @@ public function handleClient( do { if ($this->http2driver) { - $this->removeTimeout(); + $this->suspendTimeout(); $this->http2driver->handleClientWithBuffer($buffer, $this->readableStream); return; } @@ -368,6 +368,8 @@ public function handleClient( ]) ); + $this->removeTimeout(); + // Internal upgrade $this->http2driver = new Http2Driver( requestHandler: $this->requestHandler, @@ -412,6 +414,8 @@ public function handleClient( continue; } + $this->suspendTimeout(); + $this->currentBuffer = $buffer; $this->handleRequest($request); $this->pendingResponseCount--; @@ -740,7 +744,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void { $this->bodyQueue = null; $queue->complete(); - $this->updateTimeout(); + $this->suspendTimeout(); if ($this->http2driver) { continue; @@ -802,6 +806,11 @@ private function updateTimeout(): void self::getTimeoutQueue()->update($this->client, 0, $this->connectionTimeout); } + private function suspendTimeout(): void + { + self::getTimeoutQueue()->suspend($this->client, 0); + } + private function removeTimeout(): void { self::getTimeoutQueue()->remove($this->client, 0); diff --git a/src/Driver/Http2Driver.php b/src/Driver/Http2Driver.php index d981d853..2acc9282 100644 --- a/src/Driver/Http2Driver.php +++ b/src/Driver/Http2Driver.php @@ -20,6 +20,7 @@ use Amp\Http\Server\ClientException; use Amp\Http\Server\Driver\Internal\AbstractHttpDriver; use Amp\Http\Server\Driver\Internal\Http2Stream; +use Amp\Http\Server\Driver\Internal\StreamTimeoutTracker; use Amp\Http\Server\ErrorHandler; use Amp\Http\Server\Push; use Amp\Http\Server\Request; @@ -67,6 +68,8 @@ final class Http2Driver extends AbstractHttpDriver implements Http2Processor private ReadableStream $readableStream; private WritableStream $writableStream; + private StreamTimeoutTracker $timeoutTracker; + private int $serverWindow = self::DEFAULT_WINDOW_SIZE; private int $clientWindow = self::DEFAULT_WINDOW_SIZE; @@ -140,9 +143,12 @@ public function handleClient( $this->readableStream = $readableStream; $this->writableStream = $writableStream; - self::getTimeoutQueue()->insert($this->client, 0, fn () => $this->shutdown( - new ClientException($this->client, 'Shutting down connection due to inactivity'), - ), $this->streamTimeout); + $this->timeoutTracker = new StreamTimeoutTracker( + $this->client, + self::getTimeoutQueue(), + $this->connectionTimeout, + fn () => $this->shutdown(new ClientException($this->client, 'Shutting down connection due to inactivity')), + ); $this->processClientInput(); } @@ -163,6 +169,13 @@ public function initializeWriting( $this->client = $client; $this->writableStream = $writableStream; + $this->timeoutTracker = new StreamTimeoutTracker( + $this->client, + self::getTimeoutQueue(), + $this->connectionTimeout, + fn () => $this->shutdown(new ClientException($this->client, 'Shutting down connection due to inactivity')), + ); + if ($this->settings !== null) { // Upgraded connections automatically assume an initial stream with ID 1. // No data will be incoming on this stream, so body size of 0. @@ -196,10 +209,6 @@ public function handleClientWithBuffer(string $buffer, ReadableStream $readableS $this->readableStream = $readableStream; - self::getTimeoutQueue()->insert($this->client, 0, fn () => $this->shutdown( - new ClientException($this->client, 'Shutting down connection due to inactivity'), - ), $this->streamTimeout); - $this->processClientInput($buffer); } @@ -231,7 +240,6 @@ private function processClientInput(?string $chunk = null): void )); } finally { $parser->cancel(); - self::getTimeoutQueue()->remove($this->client, 0); } } @@ -646,10 +654,9 @@ private function createStream(int $id, int $bodySizeLimit, int $flags = Http2Str \assert(!isset($this->streams[$id])); if ($id & 1) { - self::getTimeoutQueue()->insert( - $this->client, + $this->timeoutTracker->insert( $id, - fn () => $this->releaseStream( + fn (int $id) => $this->releaseStream( $id, new ClientException($this->client, "Closing stream due to inactivity"), ), @@ -672,7 +679,7 @@ private function releaseStream(int $id, ?ClientException $exception = null): voi $this->streams[$id]->deferredCancellation->cancel(); if ($id & 1) { - self::getTimeoutQueue()->remove($this->client, $id); + $this->timeoutTracker->remove($id); } ($this->bodyQueues[$id] ?? null)?->error( @@ -692,10 +699,15 @@ private function releaseStream(int $id, ?ClientException $exception = null): voi private function updateTimeout(int $id): void { - self::getTimeoutQueue()->update($this->client, 0, $this->connectionTimeout); + if ($id & 1) { + $this->timeoutTracker->update($id, $this->streamTimeout); + } + } + private function suspendTimeout(int $id): void + { if ($id & 1) { - self::getTimeoutQueue()->update($this->client, $id, $this->streamTimeout); + $this->timeoutTracker->suspend($id); } } @@ -786,7 +798,7 @@ public function handlePing(string $data): void { if (!$this->pinged) { // Ensure there are a few extra seconds for request after first ping. - self::getTimeoutQueue()->update($this->client, 0, 5); + $this->timeoutTracker->ping(5); } $this->pinged++; @@ -899,8 +911,6 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool // Header frames can be received on previously opened streams (trailer headers). $this->remoteStreamId = \max($streamId, $this->remoteStreamId); - $this->updateTimeout($streamId); - if (isset($this->trailerDeferreds[$streamId]) && $stream->state & Http2Stream::RESERVED) { if (!$streamEnded) { throw new Http2ConnectionException( @@ -932,12 +942,16 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool unset($this->bodyQueues[$streamId], $this->trailerDeferreds[$streamId]); + $this->suspendTimeout($streamId); + $queue->complete(); $deferred->complete($headers); return; } + $this->updateTimeout($streamId); + if ($stream->state & Http2Stream::RESERVED) { throw new Http2StreamException( "Stream already reserved", diff --git a/src/Driver/Internal/StreamTimeoutTracker.php b/src/Driver/Internal/StreamTimeoutTracker.php new file mode 100644 index 00000000..e1749dea --- /dev/null +++ b/src/Driver/Internal/StreamTimeoutTracker.php @@ -0,0 +1,92 @@ + */ + private array $callbacks = []; + + private int $pingTimeout = 0; + + /** + * @param \Closure():void $onConnectionTimeout + */ + public function __construct( + private readonly Client $client, + private readonly TimeoutQueue $timeoutQueue, + private readonly int $connectionTimeout, + \Closure $onConnectionTimeout, + ) { + $this->onStreamTimeout = weakClosure(function (Client $client, int $streamId): void { + \assert(isset($this->callbacks[$streamId]), "Callback missing for stream ID " . $streamId); + + $callback = $this->callbacks[$streamId]; + unset($this->callbacks[$streamId]); + + async($callback, $streamId)->ignore(); + + if (!$this->callbacks) { + $this->timeoutQueue->update($this->client, 0, \min(0, $this->pingTimeout - \time())); + } + }); + + $timeoutQueue->insert($this->client, 0, $onConnectionTimeout, $this->connectionTimeout); + } + + public function __destruct() + { + $this->timeoutQueue->remove($this->client, 0); + } + + public function ping(int $timeout): void + { + $this->pingTimeout = \time() + $timeout; + + if (!$this->callbacks) { + $this->timeoutQueue->update($this->client, 0, $timeout); + } + } + + public function insert(int $streamId, \Closure $onTimeout, int $timeout): void + { + \assert($streamId > 0); + + $this->timeoutQueue->insert($this->client, $streamId, $this->onStreamTimeout, $timeout); + $this->callbacks[$streamId] = $onTimeout; + $this->timeoutQueue->suspend($this->client, 0); + } + + public function update(int $streamId, int $timeout): void + { + \assert($streamId > 0); + + $this->timeoutQueue->update($this->client, $streamId, $timeout); + } + + public function suspend(int $streamId): void + { + \assert($streamId > 0); + + $this->timeoutQueue->suspend($this->client, $streamId); + } + + public function remove(int $streamId): void + { + \assert($streamId > 0); + + $this->timeoutQueue->remove($this->client, $streamId); + unset($this->callbacks[$streamId]); + + if (!$this->callbacks) { + $this->timeoutQueue->update($this->client, 0, $this->connectionTimeout); + } + } +} diff --git a/src/Driver/Internal/TimeoutQueue.php b/src/Driver/Internal/TimeoutQueue.php index 2b9f20d6..2e6b2e1d 100644 --- a/src/Driver/Internal/TimeoutQueue.php +++ b/src/Driver/Internal/TimeoutQueue.php @@ -75,11 +75,19 @@ private function makeId(Client $client, int $streamId): string public function update(Client $client, int $streamId, int $timeout): void { $cacheId = $this->makeId($client, $streamId); - \assert(isset($this->callbacks[$cacheId])); + \assert(isset($this->callbacks[$cacheId], $this->streamNames[$client][$streamId])); $this->timeoutCache->update($cacheId, $this->now + $timeout); } + public function suspend(Client $client, int $streamId): void + { + $cacheId = $this->makeId($client, $streamId); + \assert(isset($this->callbacks[$cacheId], $this->streamNames[$client][$streamId])); + + $this->timeoutCache->clear($cacheId); + } + /** * Remove the given stream ID. */ diff --git a/test/Driver/Http2DriverTest.php b/test/Driver/Http2DriverTest.php index 1cedf045..7a86c4de 100644 --- a/test/Driver/Http2DriverTest.php +++ b/test/Driver/Http2DriverTest.php @@ -839,7 +839,7 @@ public function testStreamClosesWhileAwaitingResponseRead(): void $requestQueue->push(self::packFrame(\pack("N", 0), Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, 1)); - delay(0); // Invoke onDispose handler. + delay(0.1); // Invoke onDispose handler. self::assertTrue($invoked); }