Skip to content

Commit

Permalink
Use timeouts to avoid long buffering in CompressionMiddleware (#334)
Browse files Browse the repository at this point in the history
Closes #324.
  • Loading branch information
trowski authored May 30, 2022
1 parent 0f431a4 commit 3f24c16
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 71 deletions.
84 changes: 84 additions & 0 deletions examples/event-source.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#!/usr/bin/env php
<?php

require \dirname(__DIR__) . "/vendor/autoload.php";

use Amp\ByteStream\IteratorStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Delayed;
use Amp\Http\Server\HttpServer;
use Amp\Http\Server\Request;
use Amp\Http\Server\RequestHandler\CallableRequestHandler;
use Amp\Http\Server\Response;
use Amp\Http\Status;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Producer;
use Amp\Socket;
use Monolog\Logger;

// Run this script, then visit http://localhost:1337/ in your browser.

$html = <<<HTML
<html lang="en">
<head>
<title>Event Source Demo</title>
</head>
<body>
<script>
const eventSource = new EventSource('/events');
const eventList = document.createElement('ol');
document.body.appendChild(eventList);
eventSource.addEventListener('notification', function (e) {
const element = document.createElement('li');
element.textContent = 'Message: ' + e.data;
eventList.appendChild(element);
});
</script>
</body>
</html>
HTML;

Amp\Loop::run(function () use ($html) {
$servers = [
Socket\Server::listen("0.0.0.0:1337"),
Socket\Server::listen("[::]:1337"),
];

$logHandler = new StreamHandler(new ResourceOutputStream(\STDOUT));
$logHandler->setFormatter(new ConsoleFormatter);
$logger = new Logger('server');
$logger->pushHandler($logHandler);

$server = new HttpServer($servers, new CallableRequestHandler(function (Request $request) use ($html) {
$path = $request->getUri()->getPath();

if ($path === '/') {
return new Response(Status::OK, [
"content-type" => "text/html; charset=utf-8",
], $html);
}

if ($path === '/events') {
// We stream the response here, one event every 500 ms.
return new Response(Status::OK, [
"content-type" => "text/event-stream; charset=utf-8",
], new IteratorStream(new Producer(function (callable $emit) {
for ($i = 0; $i < 30; $i++) {
yield new Delayed(500);
yield $emit("event: notification\ndata: Event {$i}\n\n");
}
})));
}

return new Response(Status::NOT_FOUND);
}), $logger);

yield $server->start();

// Stop the server when SIGINT is received (this is technically optional, but it is best to call Server::stop()).
Amp\Loop::onSignal(SIGINT, function (string $watcherId) use ($server) {
Amp\Loop::cancel($watcherId);
yield $server->stop();
});
});
53 changes: 0 additions & 53 deletions examples/stream.php

This file was deleted.

72 changes: 54 additions & 18 deletions src/Middleware/CompressionMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
use Amp\Http\Server\Middleware;
use Amp\Http\Server\Request;
use Amp\Http\Server\RequestHandler;
use Amp\Loop;
use Amp\Producer;
use Amp\Promise;
use Amp\TimeoutException;
use cash\LRUCache;

final class CompressionMiddleware implements Middleware
Expand All @@ -18,6 +20,7 @@ final class CompressionMiddleware implements Middleware
/** @link http://webmasters.stackexchange.com/questions/31750/what-is-recommended-minimum-object-size-for-deflate-performance-benefits */
const DEFAULT_MINIMUM_LENGTH = 860;
const DEFAULT_CHUNK_SIZE = 8192;
const DEFAULT_BUFFER_TIMEOUT = 100;
const DEFAULT_CONTENT_TYPE_REGEX = '#^(?:text/.*+|[^/]*+/xml|[^+]*\+xml|application/(?:json|(?:x-)?javascript))$#i';

/** @var int Minimum body length before body is compressed. */
Expand All @@ -32,10 +35,14 @@ final class CompressionMiddleware implements Middleware
/** @var LRUCache */
private $contentTypeCache;

/** @var int */
private $bufferTimeout;

public function __construct(
int $minimumLength = self::DEFAULT_MINIMUM_LENGTH,
int $chunkSize = self::DEFAULT_CHUNK_SIZE,
string $contentRegex = self::DEFAULT_CONTENT_TYPE_REGEX
string $contentRegex = self::DEFAULT_CONTENT_TYPE_REGEX,
int $bufferTimeout = self::DEFAULT_BUFFER_TIMEOUT
) {
if (!\extension_loaded('zlib')) {
throw new \Error(__CLASS__ . ' requires ext-zlib');
Expand All @@ -49,6 +56,9 @@ public function __construct(
throw new \Error("The chunk size must be positive");
}

if ($bufferTimeout < 1) {
throw new \Error("The buffer timeout must be positive");
}
$this->contentTypeCache = new LRUCache(self::MAX_CACHE_SIZE);

$this->minimumLength = $minimumLength;
Expand Down Expand Up @@ -120,20 +130,30 @@ public function deflate(Request $request, RequestHandler $requestHandler): \Gene
$body = $response->getBody();
$bodyBuffer = '';

$promise = $body->read();

if ($contentLength === null) {
do {
$bodyBuffer .= $chunk = yield $body->read();
$expiration = Loop::now() + $this->bufferTimeout;

if (isset($bodyBuffer[$this->minimumLength])) {
break;
}
try {
do {
$bodyBuffer .= $chunk = yield Promise\timeout($promise, \max(1, $expiration - Loop::now()));

if ($chunk === null) {
// Body is not large enough to compress.
$response->setBody($bodyBuffer);
return $response;
}
} while (true);
if (isset($bodyBuffer[$this->minimumLength])) {
break;
}

if ($chunk === null) {
// Body is not large enough to compress.
$response->setBody($bodyBuffer);
return $response;
}

$promise = $body->read();
} while (true);
} catch (TimeoutException $exception) {
// Failed to buffer enough bytes within timeout, so continue to compressing body anyway.
}
}

switch ($encoding) {
Expand Down Expand Up @@ -165,22 +185,38 @@ public function deflate(Request $request, RequestHandler $requestHandler): \Gene
$response->setHeader("content-encoding", $encoding);
$response->addHeader("vary", "accept-encoding");

$iterator = new Producer(function (callable $emit) use ($resource, $body, $bodyBuffer) {
$iterator = new Producer(function (callable $emit) use ($resource, $body, $promise, $bodyBuffer) {
do {
if (isset($bodyBuffer[$this->chunkSize - 1])) {
try {
$expiration = Loop::now() + $this->bufferTimeout;

while (!isset($bodyBuffer[$this->chunkSize - 1])) {
$bodyBuffer .= $chunk = yield $bodyBuffer === ''
? $promise
: Promise\timeout($promise, \max(1, $expiration - Loop::now()));

if ($chunk === null) {
break 2;
}

$promise = $body->read();
}
} catch (TimeoutException $exception) {
// Emit the bytes we do have.
}

if ($bodyBuffer !== '') {
if (false === $bodyBuffer = \deflate_add($resource, $bodyBuffer, \ZLIB_SYNC_FLUSH)) {
throw new \RuntimeException("Failed adding data to deflate context");
}

yield $emit($bodyBuffer);
$bodyBuffer = '';
}

$bodyBuffer .= $chunk = yield $body->read();
} while ($chunk !== null);
} while (true);

if (false === $bodyBuffer = \deflate_add($resource, $bodyBuffer, \ZLIB_FINISH)) {
throw new \RuntimeException("Failed adding data to deflate context");
throw new \RuntimeException("Failed finishing deflate context");
}

$emit($bodyBuffer);
Expand Down

0 comments on commit 3f24c16

Please sign in to comment.