Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Iteratively encode JSON responses #8013

Merged
merged 21 commits into from
Aug 18, 2020
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8013.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Iteratively encode JSON to avoid blocking the reactor.
90 changes: 82 additions & 8 deletions synapse/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import urllib
from http import HTTPStatus
from io import BytesIO
from typing import Any, Callable, Dict, Tuple, Union
from typing import Any, Callable, Dict, Iterator, List, Tuple, Union

import jinja2
from canonicaljson import encode_canonical_json, encode_pretty_printed_json
from canonicaljson import iterencode_canonical_json, iterencode_pretty_printed_json
from zope.interface import implementer

from twisted.internet import defer
from twisted.internet import defer, interfaces
from twisted.python import failure
from twisted.web import resource
from twisted.web.server import NOT_DONE_YET, Request
Expand Down Expand Up @@ -499,6 +500,71 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect):
pass


@implementer(interfaces.IPullProducer)
class _ByteProducer:
"""
Iteratively write bytes to the request.
"""

# The minimum number of bytes for each chunk. Note that the last chunk will
# usually be smaller than this.
min_chunk_size = 1024

def __init__(
self, request: Request, iterator: Iterator[bytes],
):
self._request = request
self._iterator = iterator

def start(self):
clokep marked this conversation as resolved.
Show resolved Hide resolved
self._request.registerProducer(self, False)

def _send_data(self, data: List[bytes]) -> None:
"""
Send a list of strings as a response to the request.
"""
if not data:
return
self._request.write(b"".join(data))

def resumeProducing(self):
clokep marked this conversation as resolved.
Show resolved Hide resolved
# We've stopped producing in the meantime.
if not self._request:
return

# Get the next chunk and write it to the request. Calling write will
# spin the reactor (and might be re-entrant).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this true? We're' not awaiting here so the reactor should not be able to spin?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's cut-and-pasted from elsewhere; but I seem to remember that it is true that this can be re-entrant.

Copy link
Member Author

@clokep clokep Aug 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was originally cut-and-pasted from the NoRangeStaticProducer and then tweaked a bit to be more readable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some logging and I actually wasn't able to cause this to be re-entrant?

Anyway the question of "how does the reactor spin?" is convoluted, but I think I found the answer:

The result is that in one reactor "tick" this should generate <~1 KB of data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, right interesting. Since this function is not a coroutine I don't think this can yield back to the reactor half way through the function

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, it "yields" when the function runs off, NOT in the middle of the function.

Do you think any changes are needed here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I still don't really see what this comment is trying to convey. write may or may not cause the reactor to wake up immediately, and I don't see why we are trying to highlight that.

If we want to highlight that this function may be re-entrant, it'd be good to mention what the code is doing to make that safe (I guess something to do with the fact that the write is the last thing we do in the function, and its only during write that we can become re-entrant?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it is important that you check if self._request still exists since something could have called stopProducing.

Maybe it isn't useful to clarify -- I can remove the part of the comment if you'd like (or maybe move it above the self._request check?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saying that at by the check I think would be best

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@erikjohnston I clarified a bunch of comments! Let me know if it is better now!

buffer = []
buffered_bytes = 0
while buffered_bytes < self.min_chunk_size:
try:
data = next(self._iterator)
buffer.append(data)
buffered_bytes += len(data)
clokep marked this conversation as resolved.
Show resolved Hide resolved
except StopIteration:
# Everything is serialized, write any data, then finalize the
# producer.
self._send_data(buffer)
self._request.unregisterProducer()
self._request.finish()
self.stopProducing()
return

self._send_data(buffer)

def stopProducing(self):
clokep marked this conversation as resolved.
Show resolved Hide resolved
self._generator = None # type: ignore
clokep marked this conversation as resolved.
Show resolved Hide resolved
self._request = None


def _encode_json_bytes(json_object: Any) -> Iterator[bytes]:
"""
Encode an object into JSON. Returns an iterator of bytes.
"""
for chunk in json_encoder.iterencode(json_object):
yield chunk.encode("utf-8")


def respond_with_json(
request: Request,
code: int,
Expand Down Expand Up @@ -533,15 +599,23 @@ def respond_with_json(
return None

if pretty_print:
json_bytes = encode_pretty_printed_json(json_object) + b"\n"
encoder = iterencode_pretty_printed_json
else:
if canonical_json or synapse.events.USE_FROZEN_DICTS:
# canonicaljson already encodes to bytes
json_bytes = encode_canonical_json(json_object)
encoder = iterencode_canonical_json
else:
json_bytes = json_encoder.encode(json_object).encode("utf-8")
encoder = _encode_json_bytes

request.setResponseCode(code)
request.setHeader(b"Content-Type", b"application/json")
request.setHeader(b"Cache-Control", b"no-cache, no-store, must-revalidate")

return respond_with_json_bytes(request, code, json_bytes, send_cors=send_cors)
if send_cors:
set_cors_headers(request)

producer = _ByteProducer(request, encoder(json_object))
producer.start()
return NOT_DONE_YET


def respond_with_json_bytes(
Expand Down
2 changes: 1 addition & 1 deletion synapse/python_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"jsonschema>=2.5.1",
"frozendict>=1",
"unpaddedbase64>=1.1.0",
"canonicaljson>=1.2.0",
"canonicaljson>=1.3.0",
# we use the type definitions added in signedjson 1.1.
"signedjson>=1.1.0",
"pynacl>=1.2.1",
Expand Down
6 changes: 3 additions & 3 deletions synapse/rest/key/v2/remote_key_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
import logging
from typing import Dict, Set

from canonicaljson import encode_canonical_json, json
from canonicaljson import json
from signedjson.sign import sign_json

from synapse.api.errors import Codes, SynapseError
from synapse.crypto.keyring import ServerKeyFetcher
from synapse.http.server import DirectServeJsonResource, respond_with_json_bytes
from synapse.http.server import DirectServeJsonResource, respond_with_json
from synapse.http.servlet import parse_integer, parse_json_object_from_request

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -223,4 +223,4 @@ async def query_keys(self, request, query, query_remote_on_cache_miss=False):

results = {"server_keys": signed_keys}

respond_with_json_bytes(request, 200, encode_canonical_json(results))
respond_with_json(request, 200, results, canonical_json=True)
1 change: 0 additions & 1 deletion tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ def _callback(request, **kwargs):

self.assertEqual(channel.result["code"], b"200")
self.assertNotIn("body", channel.result)
self.assertEqual(channel.headers.getRawHeaders(b"Content-Length"), [b"15"])


class OptionsResourceTests(unittest.TestCase):
Expand Down