Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compression is incompatible with streaming responses #324

Closed
Nek- opened this issue Nov 14, 2021 · 5 comments
Closed

Compression is incompatible with streaming responses #324

Nek- opened this issue Nov 14, 2021 · 5 comments
Labels

Comments

@Nek-
Copy link

Nek- commented Nov 14, 2021

I tried to make a super-simple SSE server (code below). It was just not working until I disabled the compression. Since I assume streamed responses should work with compression enabled, I open an issue about it.

use Amp\ByteStream\ResourceOutputStream;
use Amp\Http\Server\HttpServer;
use Amp\Http\Server\RequestHandler\CallableRequestHandler;
use Amp\Http\Server\Response;
use Amp\Http\Status;
use Amp\Log\ConsoleFormatter;
use Amp\Log\StreamHandler;
use Amp\Socket;
use Amp\Http\Server\Request;
use Monolog\Logger;
use Amp\ByteStream\IteratorStream;
use Amp\Producer;


function newEvent() {
    $id = mt_rand(1, 1000);
    return ['id' => $id, 'title' => 'title ' . $id, 'content' => 'content ' . $id];
}

$events = [
    newEvent(),
    newEvent(),
];

Amp\Loop::run(static function () use (&$events) {
    $cert = new Socket\Certificate(__DIR__ . '/../ssl/cert.pem', __DIR__ . '/../ssl/key.pem');

    $context = (new Socket\BindContext)
        ->withTlsContext((new Socket\ServerTlsContext)->withDefaultCertificate($cert));

    $servers = [
        Socket\Server::listen("0.0.0.0:1337"),
        Socket\Server::listen("[::]:1337"),
        Socket\Server::listen("0.0.0.0:1338", $context),
        Socket\Server::listen("[::]:1338", $context),
    ];

    $logHandler = new StreamHandler(new ResourceOutputStream(STDOUT));
    $logHandler->setFormatter(new ConsoleFormatter);
    $logger = new Logger('server');
    $logger->pushHandler($logHandler);

    $server = new HttpServer($servers, new CallableRequestHandler(static function (Request $request) use (&$events) {
        if ($request->getUri()->getPath() === '/') {
            return new Response(
                Status::OK,
                [
                    "content-type" => "text/html; charset=utf-8"
                ],
                <<<FRONT
                <!DOCTYPE html>
                <html lang="en">
                    <head>
                    <title>Yo</title>
                    </head>
                    <body>
                        <h1>Hello World!</h1>
                        <div id="news"></div>
                        <script>
                        //*
                        var news = document.getElementById('news');
                        const evtSource = new EventSource("/sse");
                        evtSource.addEventListener('news', function (event) {
                            news.innerHTML = news.innerHTML + "<p>"+event.data+"</p>";
                        });
                        //*/
                        </script>            
                    </body>            
                </html>
                FRONT
            );
        }

        if ($request->getUri()->getPath() === '/sse') {
            return new Response(
                Status::OK,
                [
                    'Access-Control-Allow-Origin' => '*',
                    'Content-Type' => 'text/event-stream',
                    'Cache-Control' => 'no-cache',
                    'X-Accel-Buffering' => 'no'
                ],
                new IteratorStream(new Producer(function (callable $emit) use (&$events) {
                        while(true) {
                            if (empty($events)) {
                                yield new \Amp\Delayed(10);
                            } else {
                                $data = json_encode(array_pop($events));
                                yield $emit(
                                    "event: news\ndata: $data\n\n"
                                );
                            }
                        }
                    }
                ))
            );
        }

        return new Response(Status::NOT_FOUND, ["content-type" => "text/plain; charset=utf-8"], '404 Not found');

    // uncomment the option part to make it work
    }), $logger/*, (new \Amp\Http\Server\Options())->withoutCompression()*/);

    yield $server->start();

    // Stop the server when SIGINT is received (this is technically optional, but it is best to call Server::stop()).
    Amp\Loop::onSignal(\SIGINT, static function (string $watcherId) use ($server) {
        Amp\Loop::cancel($watcherId);
        yield $server->stop();
    });
});

Using this works:

$server = new HttpServer($servers, stack(new CallableRequestHandler(static function (Request $request) use (&$events) {
    // ...
}), new \Amp\Http\Server\Middleware\CompressionMiddleware(12, 1));
// Note: 12 is minimum data for an sse.
@trowski
Copy link
Member

trowski commented Apr 29, 2022

Sorry for the delay in responding – though it was clear you figured this out quickly, probably why none of us felt the need.

CompressionMiddleware buffers a certain number of bytes before compressing them to be more efficient. I wonder if using compression with SSE is wise. You may actually use more data for small payloads. If you are sending larger payloads on average, then reducing the number of bytes buffered in the constructor is exactly the solution you needed.

@trowski trowski closed this as completed Apr 29, 2022
@kelunik kelunik reopened this Apr 29, 2022
@Nek-
Copy link
Author

Nek- commented Apr 29, 2022

The best solution would actually be to buffer only the message. I think the current behavior is far from optimal.

FYI I was looking into that because I was trying to build a mercure.rock implementation on my own.

@trowski
Copy link
Member

trowski commented Apr 29, 2022

I missed the obvious here and @kelunik pointed it out in chat. I guess we should change the behavior of CompressionMiddleware for text/event-stream rather than forcing users to disable compression for certain routes.

@trowski
Copy link
Member

trowski commented Apr 29, 2022

We also have the stream threshold setting that gets in the way of event streaming. Looks like it would be advantageous to add an event-stream example too.

trowski added a commit that referenced this issue May 29, 2022
Completely removed chunkSize param in for v3.

Related to #324.
trowski added a commit that referenced this issue May 29, 2022
Completely removed chunkSize param in for v3.

Related to #324.
trowski added a commit that referenced this issue May 29, 2022
Completely removed chunkSize param in for v3.

Related to #324.
@trowski trowski added the has PR label May 29, 2022
trowski added a commit that referenced this issue May 30, 2022
Completely removed chunkSize param in for v3.

Related to #324.
@MBauerDC
Copy link

In the merged change, bufferTimeout is a constructor parameter for CompressionMiddleware, but the property is not set in the constructor. This leads to bufferTimout always being null.

I have prepared the (single-line change) in a PR here: #339

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

4 participants