Skip to content

Commit

Permalink
Use timeouts to avoid long buffering in CompressionMiddleware
Browse files Browse the repository at this point in the history
Completely removed chunkSize param in for v3.

Related to #324.
  • Loading branch information
trowski committed May 30, 2022
1 parent 23d94c8 commit 827b1a4
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 86 deletions.
88 changes: 88 additions & 0 deletions examples/event-source.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env php
<?php

require dirname(__DIR__) . "/vendor/autoload.php";

use Amp\ByteStream;
use Amp\ByteStream\ReadableIterableStream;
use Amp\Http\Server\DefaultErrorHandler;
use Amp\Http\Server\Request;
use Amp\Http\Server\RequestHandler\ClosureRequestHandler;
use Amp\Http\Server\Response;
use Amp\Http\Server\SocketHttpServer;
use Amp\Http\Status;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Socket;
use Monolog\Logger;
use Monolog\Processor\PsrLogMessageProcessor;
use function Amp\delay;
use function Amp\trapSignal;

// Run this script, then visit http://localhost:1337/ in your browser.

$html = <<<HTML
<html lang="en">
<head>
<title>Event Source Demo</title>
</head>
<body>
<script>
const eventSource = new EventSource('/events');
const eventList = document.createElement('ol');
document.body.appendChild(eventList);
eventSource.addEventListener('notification', function (e) {
const element = document.createElement('li');
element.textContent = 'Message: ' + e.data;
eventList.appendChild(element);
});
</script>
</body>
</html>
HTML;

$logHandler = new StreamHandler(ByteStream\getStdout());
$logHandler->pushProcessor(new PsrLogMessageProcessor());
$logHandler->setFormatter(new ConsoleFormatter);
$logger = new Logger('server');
$logger->pushHandler($logHandler);

$server = new SocketHttpServer($logger, enableCompression: true);

$server->expose(new Socket\InternetAddress("0.0.0.0", 1337));
$server->expose(new Socket\InternetAddress("[::]", 1337));

$server->start(new ClosureRequestHandler(function (Request $request) use ($html): Response {
$path = $request->getUri()->getPath();

if ($path === '/') {
return new Response(
status: Status::OK,
headers: ["content-type" => "text/html; charset=utf-8"],
body: $html,
);
}

if ($path === '/events') {
// We stream the response here, one event every 500 ms.
return new Response(
status: Status::OK,
headers: ["content-type" => "text/event-stream; charset=utf-8"],
body: new ReadableIterableStream((function () {
for ($i = 0; $i < 30; $i++) {
delay(0.5);
yield "event: notification\ndata: Event {$i}\n\n";
}
})()),
);
}

return new Response(Status::NOT_FOUND);
}), new DefaultErrorHandler());

// Await SIGINT or SIGTERM to be received.
$signal = trapSignal([\SIGINT, \SIGTERM]);

$logger->info(sprintf("Received signal %d, stopping HTTP server", $signal));

$server->stop();
54 changes: 0 additions & 54 deletions examples/stream.php

This file was deleted.

74 changes: 42 additions & 32 deletions src/Middleware/CompressionMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

use Amp\ByteStream\ReadableIterableStream;
use Amp\ByteStream\ReadableStream;
use Amp\CancelledException;
use Amp\Http\Server\Middleware;
use Amp\Http\Server\Request;
use Amp\Http\Server\RequestHandler;
use Amp\Http\Server\Response;
use Amp\TimeoutCancellation;
use cash\LRUCache;

final class CompressionMiddleware implements Middleware
Expand All @@ -16,20 +18,19 @@ final class CompressionMiddleware implements Middleware

/** @link http://webmasters.stackexchange.com/questions/31750/what-is-recommended-minimum-object-size-for-deflate-performance-benefits */
public const DEFAULT_MINIMUM_LENGTH = 860;
public const DEFAULT_CHUNK_SIZE = 8192;
public const DEFAULT_BUFFER_TIMEOUT = 0.1;
public const DEFAULT_CONTENT_TYPE_REGEX = '#^(?:text/.*+|[^/]*+/xml|[^+]*\+xml|application/(?:json|(?:x-)?javascript))$#i';

private readonly LRUCache $contentTypeCache;

/**
* @param positive-int $minimumLength Minimum body length before body is compressed.
* @param positive-int $chunkSize Minimum chunk size before being compressed.
* @param string $contentRegex
*/
public function __construct(
private readonly int $minimumLength = self::DEFAULT_MINIMUM_LENGTH,
private readonly int $chunkSize = self::DEFAULT_CHUNK_SIZE,
private readonly string $contentRegex = self::DEFAULT_CONTENT_TYPE_REGEX
private readonly string $contentRegex = self::DEFAULT_CONTENT_TYPE_REGEX,
private readonly float $bufferTimeout = self::DEFAULT_BUFFER_TIMEOUT,
) {
if (!\extension_loaded('zlib')) {
throw new \Error(__CLASS__ . ' requires ext-zlib');
Expand All @@ -40,9 +41,8 @@ public function __construct(
throw new \Error("The minimum length must be positive");
}

/** @psalm-suppress TypeDoesNotContainType */
if ($chunkSize < 1) {
throw new \Error("The chunk size must be positive");
if ($bufferTimeout <= 0) {
throw new \Error("The buffer timeout must be positive");
}

$this->contentTypeCache = new LRUCache(self::MAX_CACHE_SIZE);
Expand Down Expand Up @@ -106,20 +106,9 @@ public function handleRequest(Request $request, RequestHandler $requestHandler):
$body = $response->getBody();
$bodyBuffer = '';

if ($contentLength === null) {
do {
$bodyBuffer .= $chunk = $body->read();

if (isset($bodyBuffer[$this->minimumLength])) {
break;
}

if ($chunk === null) {
// Body is not large enough to compress.
$response->setBody($bodyBuffer);
return $response;
}
} while (true);
if ($contentLength === null && !$this->shouldCompress($body, $bodyBuffer)) {
$response->setBody($bodyBuffer);
return $response;
}

$mode = match ($encoding) {
Expand All @@ -146,38 +135,59 @@ public function handleRequest(Request $request, RequestHandler $requestHandler):

/** @psalm-suppress InvalidArgument Psalm stubs are out of date, deflate_init returns a \DeflateContext */
$response->setBody(
new ReadableIterableStream(self::readBody($context, $body, $bodyBuffer, $this->chunkSize))
new ReadableIterableStream(self::readBody($context, $body, $bodyBuffer))
);

return $response;
}

private function shouldCompress(ReadableStream $body, string &$bodyBuffer): bool
{
try {
$cancellation = new TimeoutCancellation($this->bufferTimeout);

do {
$bodyBuffer .= $chunk = $body->read($cancellation);

if (isset($bodyBuffer[$this->minimumLength])) {
return true;
}

if ($chunk === null) {
return false;
}
} while (true);
} catch (CancelledException) {
// Not enough bytes buffered within timeout to determine body size, so use compression by default.
}

return true;
}

/**
* @psalm-suppress InvalidArgument
*/
private static function readBody(
\DeflateContext $context,
ReadableStream $body,
string $bodyBuffer,
int $chunkSize,
string $chunk,
): \Generator {
do {
if (isset($bodyBuffer[$chunkSize - 1])) {
if (false === $bodyBuffer = \deflate_add($context, $bodyBuffer, \ZLIB_SYNC_FLUSH)) {
if ($chunk !== '') {
if (false === $chunk = \deflate_add($context, $chunk, \ZLIB_SYNC_FLUSH)) {
throw new \RuntimeException("Failed adding data to deflate context");
}

yield $bodyBuffer;
$bodyBuffer = '';
yield $chunk;
}

$bodyBuffer .= $chunk = $body->read();
$chunk = $body->read();
} while ($chunk !== null);

if (false === $bodyBuffer = \deflate_add($context, $bodyBuffer, \ZLIB_FINISH)) {
throw new \RuntimeException("Failed adding data to deflate context");
if (false === $chunk = \deflate_add($context, '', \ZLIB_FINISH)) {
throw new \RuntimeException("Failed finishing deflate context");
}

yield $bodyBuffer;
yield $chunk;
}
}

1 comment on commit 827b1a4

@Nek-
Copy link

@Nek- Nek- commented on 827b1a4 May 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

Please sign in to comment.