diff --git a/CHANGES.md b/CHANGES.md index 1cdb0e3afcc3..bd8cf1a11fd4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,8 +1,59 @@ +Synapse 1.17.0rc1 (2020-07-09) +============================== + +Bugfixes +-------- + +- Fix inconsistent handling of upper and lower case in email addresses when used as identifiers for login, etc. Contributed by @dklimpel. ([\#7021](https://github.com/matrix-org/synapse/issues/7021)) +- Fix "Tried to close a non-active scope!" error messages when opentracing is enabled. ([\#7732](https://github.com/matrix-org/synapse/issues/7732)) +- Fix incorrect error message when database CTYPE was set incorrectly. ([\#7760](https://github.com/matrix-org/synapse/issues/7760)) +- Fix to not ignore `set_tweak` actions in Push Rules that have no `value`, as permitted by the specification. ([\#7766](https://github.com/matrix-org/synapse/issues/7766)) +- Fix synctl to handle empty config files correctly. Contributed by @kotovalexarian. ([\#7779](https://github.com/matrix-org/synapse/issues/7779)) +- Fixes a long standing bug in worker mode where worker information was saved in the devices table instead of the original IP address and user agent. ([\#7797](https://github.com/matrix-org/synapse/issues/7797)) +- Fix 'stuck invites' which happen when we are unable to reject a room invite received over federation. ([\#7804](https://github.com/matrix-org/synapse/issues/7804), [\#7809](https://github.com/matrix-org/synapse/issues/7809), [\#7810](https://github.com/matrix-org/synapse/issues/7810)) + + +Updates to the Docker image +--------------------------- + +- Include libwebp in the Docker file to properly handle webp image uploads. ([\#7791](https://github.com/matrix-org/synapse/issues/7791)) + + +Improved Documentation +---------------------- + +- Improve the documentation of the non-standard JSON web token login type. ([\#7776](https://github.com/matrix-org/synapse/issues/7776)) +- Update doc links for caddy. Contributed by Nicolai Søborg. ([\#7789](https://github.com/matrix-org/synapse/issues/7789)) + + +Internal Changes +---------------- + +- Refactor getting replication updates from database. ([\#7740](https://github.com/matrix-org/synapse/issues/7740)) +- Send push notifications with a high or low priority depending upon whether they may generate user-observable effects. ([\#7765](https://github.com/matrix-org/synapse/issues/7765)) +- Use symbolic names for replication stream names. ([\#7768](https://github.com/matrix-org/synapse/issues/7768)) +- Add early returns to `_check_for_soft_fail`. ([\#7769](https://github.com/matrix-org/synapse/issues/7769)) +- Fix up `synapse.handlers.federation` to pass mypy. ([\#7770](https://github.com/matrix-org/synapse/issues/7770)) +- Convert the appserver handler to async/await. ([\#7775](https://github.com/matrix-org/synapse/issues/7775)) +- Allow to use higher versions of prometheus_client <0.9.0 which are expected to introduce no breaking changes. Contributed by Oliver Kurz. ([\#7780](https://github.com/matrix-org/synapse/issues/7780)) +- Update linting scripts and codebase to be compatible with `isort` v5. ([\#7786](https://github.com/matrix-org/synapse/issues/7786)) +- Stop populating unused table `local_invites`. ([\#7793](https://github.com/matrix-org/synapse/issues/7793)) +- Ensure that strings (not bytes) are passed into JSON serialization. ([\#7799](https://github.com/matrix-org/synapse/issues/7799)) +- Switch from simplejson to the standard library json. ([\#7800](https://github.com/matrix-org/synapse/issues/7800)) +- Add `signing_key` property to `HomeServer` to save code duplication. ([\#7805](https://github.com/matrix-org/synapse/issues/7805)) +- Improve stacktraces from exceptions in background processes. ([\#7808](https://github.com/matrix-org/synapse/issues/7808)) +- Fix various spelling errors in comments and log lines. ([\#7811](https://github.com/matrix-org/synapse/issues/7811)) + + Synapse 1.16.0 (2020-07-08) =========================== -No significant changes. +No significant changes since 1.16.0rc2. +Note that this release deprecates the `m.login.jwt` login method, renaming it +to `org.matrix.login.jwt`, as `m.login.jwt` is not part of the Matrix spec. +Otherwise the behaviour is identical. Synapse will accept both names for now, +but this may change in a future release. Synapse 1.16.0rc2 (2020-07-02) ============================== @@ -45,11 +96,6 @@ Security advisory Synapse 1.16.0rc1 (2020-07-01) ============================== -Note that this release deprecates the `m.login.jwt` login method, renaming it -to `org.matrix.login.jwt`, as `m.login.jwt` is not part of the Matrix spec. -Otherwise the behaviour is identical. Synapse will accept both names for now, -but this may change in a future release. - Features -------- diff --git a/README.rst b/README.rst index 2441b6a35cef..38376e23c2b3 100644 --- a/README.rst +++ b/README.rst @@ -215,7 +215,7 @@ Using a reverse proxy with Synapse It is recommended to put a reverse proxy such as `nginx `_, `Apache `_, -`Caddy `_ or +`Caddy `_ or `HAProxy `_ in front of Synapse. One advantage of doing so is that it means that you can expose the default https port (443) to Matrix clients without needing to run Synapse with root privileges. diff --git a/changelog.d/7021.bugfix b/changelog.d/7021.bugfix deleted file mode 100644 index 140fe37b2d29..000000000000 --- a/changelog.d/7021.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix inconsistent handling of upper and lower case in email addresses when used as identifiers for login, etc. Contributed by @dklimpel. diff --git a/changelog.d/7732.bugfix b/changelog.d/7732.bugfix deleted file mode 100644 index d5e352e141b5..000000000000 --- a/changelog.d/7732.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix "Tried to close a non-active scope!" error messages when opentracing is enabled. diff --git a/changelog.d/7760.bugfix b/changelog.d/7760.bugfix deleted file mode 100644 index f6081f3d30be..000000000000 --- a/changelog.d/7760.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix incorrect error message when database CTYPE was set incorrectly. diff --git a/changelog.d/7765.misc b/changelog.d/7765.misc deleted file mode 100644 index fa9cfd24cbdc..000000000000 --- a/changelog.d/7765.misc +++ /dev/null @@ -1 +0,0 @@ -Send push notifications with a high or low priority depending upon whether they may generate user-observable effects. diff --git a/changelog.d/7768.misc b/changelog.d/7768.misc deleted file mode 100644 index dfb3d24c7d19..000000000000 --- a/changelog.d/7768.misc +++ /dev/null @@ -1 +0,0 @@ -Use symbolic names for replication stream names. diff --git a/changelog.d/7769.misc b/changelog.d/7769.misc deleted file mode 100644 index 2e200286cebc..000000000000 --- a/changelog.d/7769.misc +++ /dev/null @@ -1 +0,0 @@ -Add early returns to `_check_for_soft_fail`. diff --git a/changelog.d/7770.misc b/changelog.d/7770.misc deleted file mode 100644 index 5b864084bec7..000000000000 --- a/changelog.d/7770.misc +++ /dev/null @@ -1 +0,0 @@ -Fix up `synapse.handlers.federation` to pass mypy. diff --git a/changelog.d/7779.bugfix b/changelog.d/7779.bugfix deleted file mode 100644 index 61de45d570ae..000000000000 --- a/changelog.d/7779.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix synctl to handle empty config files correctly. Contributed by @kotovalexarian. diff --git a/debian/changelog b/debian/changelog index 1e7d7191ad22..f7c146d77710 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +matrix-synapse-py3 (1.17.0rc1) stable; urgency=medium + + * New synapse release 1.17.0rc1. + + -- Synapse Packaging team Thu, 09 Jul 2020 16:53:12 +0100 + matrix-synapse-py3 (1.16.0) stable; urgency=medium * New synapse release 1.16.0. diff --git a/docker/Dockerfile b/docker/Dockerfile index 9a3cf7b3f528..093e89af6c56 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -24,6 +24,7 @@ RUN apk add \ build-base \ libffi-dev \ libjpeg-turbo-dev \ + libwebp-dev \ libressl-dev \ libxslt-dev \ linux-headers \ @@ -61,6 +62,7 @@ FROM docker.io/python:${PYTHON_VERSION}-alpine3.11 RUN apk add --no-cache --virtual .runtime_deps \ libffi \ libjpeg-turbo \ + libwebp \ libressl \ libxslt \ libpq \ diff --git a/docs/jwt.md b/docs/jwt.md new file mode 100644 index 000000000000..289d66b365b2 --- /dev/null +++ b/docs/jwt.md @@ -0,0 +1,90 @@ +# JWT Login Type + +Synapse comes with a non-standard login type to support +[JSON Web Tokens](https://en.wikipedia.org/wiki/JSON_Web_Token). In general the +documentation for +[the login endpoint](https://matrix.org/docs/spec/client_server/r0.6.1#login) +is still valid (and the mechanism works similarly to the +[token based login](https://matrix.org/docs/spec/client_server/r0.6.1#token-based)). + +To log in using a JSON Web Token, clients should submit a `/login` request as +follows: + +```json +{ + "type": "org.matrix.login.jwt", + "token": "" +} +``` + +Note that the login type of `m.login.jwt` is supported, but is deprecated. This +will be removed in a future version of Synapse. + +The `jwt` should encode the local part of the user ID as the standard `sub` +claim. In the case that the token is not valid, the homeserver must respond with +`401 Unauthorized` and an error code of `M_UNAUTHORIZED`. + +(Note that this differs from the token based logins which return a +`403 Forbidden` and an error code of `M_FORBIDDEN` if an error occurs.) + +As with other login types, there are additional fields (e.g. `device_id` and +`initial_device_display_name`) which can be included in the above request. + +## Preparing Synapse + +The JSON Web Token integration in Synapse uses the +[`PyJWT`](https://pypi.org/project/pyjwt/) library, which must be installed +as follows: + + * The relevant libraries are included in the Docker images and Debian packages + provided by `matrix.org` so no further action is needed. + + * If you installed Synapse into a virtualenv, run `/path/to/env/bin/pip + install synapse[pyjwt]` to install the necessary dependencies. + + * For other installation mechanisms, see the documentation provided by the + maintainer. + +To enable the JSON web token integration, you should then add an `jwt_config` section +to your configuration file (or uncomment the `enabled: true` line in the +existing section). See [sample_config.yaml](./sample_config.yaml) for some +sample settings. + +## How to test JWT as a developer + +Although JSON Web Tokens are typically generated from an external server, the +examples below use [PyJWT](https://pyjwt.readthedocs.io/en/latest/) directly. + +1. Configure Synapse with JWT logins: + + ```yaml + jwt_config: + enabled: true + secret: "my-secret-token" + algorithm: "HS256" + ``` +2. Generate a JSON web token: + + ```bash + $ pyjwt --key=my-secret-token --alg=HS256 encode sub=test-user + eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.Ag71GT8v01UO3w80aqRPTeuVPBIBZkYhNTJJ-_-zQIc + ``` +3. Query for the login types and ensure `org.matrix.login.jwt` is there: + + ```bash + curl http://localhost:8080/_matrix/client/r0/login + ``` +4. Login used the generated JSON web token from above: + + ```bash + $ curl http://localhost:8082/_matrix/client/r0/login -X POST \ + --data '{"type":"org.matrix.login.jwt","token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0ZXN0LXVzZXIifQ.Ag71GT8v01UO3w80aqRPTeuVPBIBZkYhNTJJ-_-zQIc"}' + { + "access_token": "", + "device_id": "ACBDEFGHI", + "home_server": "localhost:8080", + "user_id": "@test-user:localhost:8480" + } + ``` + +You should now be able to use the returned access token to query the client API. diff --git a/docs/reverse_proxy.md b/docs/reverse_proxy.md index cbb8269568b3..131990001ae9 100644 --- a/docs/reverse_proxy.md +++ b/docs/reverse_proxy.md @@ -3,7 +3,7 @@ It is recommended to put a reverse proxy such as [nginx](https://nginx.org/en/docs/http/ngx_http_proxy_module.html), [Apache](https://httpd.apache.org/docs/current/mod/mod_proxy_http.html), -[Caddy](https://caddyserver.com/docs/proxy) or +[Caddy](https://caddyserver.com/docs/quick-starts/reverse-proxy) or [HAProxy](https://www.haproxy.org/) in front of Synapse. One advantage of doing so is that it means that you can expose the default https port (443) to Matrix clients without needing to run Synapse with root diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 2169082470dc..390cd7d607e4 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -1979,12 +1979,39 @@ sso: #template_dir: "res/templates" -# The JWT needs to contain a globally unique "sub" (subject) claim. +# JSON web token integration. The following settings can be used to make +# Synapse JSON web tokens for authentication, instead of its internal +# password database. +# +# Each JSON Web Token needs to contain a "sub" (subject) claim, which is +# used as the localpart of the mxid. +# +# Note that this is a non-standard login type and client support is +# expected to be non-existant. +# +# See https://github.com/matrix-org/synapse/blob/master/docs/jwt.md. # #jwt_config: -# enabled: true -# secret: "a secret" -# algorithm: "HS256" + # Uncomment the following to enable authorization using JSON web + # tokens. Defaults to false. + # + #enabled: true + + # This is either the private shared secret or the public key used to + # decode the contents of the JSON web token. + # + # Required if 'enabled' is true. + # + #secret: "provided-by-your-issuer" + + # The algorithm used to sign the JSON web token. + # + # Supported algorithms are listed at + # https://pyjwt.readthedocs.io/en/latest/algorithms.html + # + # Required if 'enabled' is true. + # + #algorithm: "provided-by-your-issuer" password_config: diff --git a/scripts-dev/check_signature.py b/scripts-dev/check_signature.py index ecda103cf7c4..6755bc528287 100644 --- a/scripts-dev/check_signature.py +++ b/scripts-dev/check_signature.py @@ -2,9 +2,9 @@ import json import logging import sys -import urllib2 import dns.resolver +import urllib2 from signedjson.key import decode_verify_key_bytes, write_signing_keys from signedjson.sign import verify_signed_json from unpaddedbase64 import decode_base64 diff --git a/scripts-dev/lint.sh b/scripts-dev/lint.sh index 6f1ba2293196..66b056885879 100755 --- a/scripts-dev/lint.sh +++ b/scripts-dev/lint.sh @@ -15,7 +15,7 @@ else fi echo "Linting these locations: $files" -isort -y -rc $files +isort $files python3 -m black $files ./scripts-dev/config-lint.sh flake8 $files diff --git a/setup.cfg b/setup.cfg index f2bca272e17c..a32278ea8a08 100644 --- a/setup.cfg +++ b/setup.cfg @@ -26,7 +26,6 @@ ignore=W503,W504,E203,E731,E501 [isort] line_length = 88 -not_skip = __init__.py sections=FUTURE,STDLIB,COMPAT,THIRDPARTY,TWISTED,FIRSTPARTY,TESTS,LOCALFOLDER default_section=THIRDPARTY known_first_party = synapse diff --git a/synapse/__init__.py b/synapse/__init__.py index de65ce6db89f..5bb09a37d702 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -36,7 +36,7 @@ except ImportError: pass -__version__ = "1.16.0" +__version__ = "1.17.0rc1" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when diff --git a/synapse/api/auth.py b/synapse/api/auth.py index e615533ea30d..768b840b6aa8 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from typing import Optional @@ -22,7 +21,6 @@ from twisted.internet import defer from twisted.web.server import Request -import synapse.logging.opentracing as opentracing import synapse.types from synapse import event_auth from synapse.api.auth_blocking import AuthBlocking @@ -35,6 +33,7 @@ ) from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase +from synapse.logging import opentracing as opentracing from synapse.types import StateMap, UserID from synapse.util.caches import register_cache from synapse.util.caches.lrucache import LruCache @@ -543,7 +542,7 @@ def compute_auth_events( # Currently we ignore the `for_verification` flag even though there are # some situations where we can drop particular auth events when adding # to the event's `auth_events` (e.g. joins pointing to previous joins - # when room is publically joinable). Dropping event IDs has the + # when room is publicly joinable). Dropping event IDs has the # advantage that the auth chain for the room grows slower, but we use # the auth chain in state resolution v2 to order events, which means # care must be taken if dropping events to ensure that it doesn't diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 27a3fc9ed63a..f6792d9fc80e 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -21,7 +21,7 @@ from typing_extensions import ContextManager -from twisted.internet import defer, reactor +from twisted.internet import address, defer, reactor import synapse import synapse.events @@ -206,10 +206,30 @@ async def on_POST(self, request, device_id): if body: # They're actually trying to upload something, proxy to main synapse. - # Pass through the auth headers, if any, in case the access token - # is there. - auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", []) - headers = {"Authorization": auth_headers} + + # Proxy headers from the original request, such as the auth headers + # (in case the access token is there) and the original IP / + # User-Agent of the request. + headers = { + header: request.requestHeaders.getRawHeaders(header, []) + for header in (b"Authorization", b"User-Agent") + } + # Add the previous hop the the X-Forwarded-For header. + x_forwarded_for = request.requestHeaders.getRawHeaders( + b"X-Forwarded-For", [] + ) + if isinstance(request.client, (address.IPv4Address, address.IPv6Address)): + previous_host = request.client.host.encode("ascii") + # If the header exists, add to the comma-separated list of the first + # instance of the header. Otherwise, generate a new header. + if x_forwarded_for: + x_forwarded_for = [ + x_forwarded_for[0] + b", " + previous_host + ] + x_forwarded_for[1:] + else: + x_forwarded_for = [previous_host] + headers[b"X-Forwarded-For"] = x_forwarded_for + try: result = await self.http_client.post_json_get_json( self.main_uri + request.uri.decode("ascii"), body, headers=headers diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index da9a5e86d4db..f92bfb420bb7 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -98,7 +98,6 @@ def query_user(self, service, user_id): if service.url is None: return False uri = service.url + ("/users/%s" % urllib.parse.quote(user_id)) - response = None try: response = yield self.get_json(uri, {"access_token": service.hs_token}) if response is not None: # just an empty json object diff --git a/synapse/config/__main__.py b/synapse/config/__main__.py index fca35b008c6e..65043d5b5b5f 100644 --- a/synapse/config/__main__.py +++ b/synapse/config/__main__.py @@ -16,6 +16,7 @@ if __name__ == "__main__": import sys + from synapse.config.homeserver import HomeServerConfig action = sys.argv[1] diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py index ca61214454f8..b1dc7ad502b2 100644 --- a/synapse/config/emailconfig.py +++ b/synapse/config/emailconfig.py @@ -14,7 +14,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from __future__ import print_function # This file can't be called email.py because if it is, we cannot: @@ -73,7 +72,7 @@ def read_config(self, config, **kwargs): template_dir = email_config.get("template_dir") # we need an absolute path, because we change directory after starting (and - # we don't yet know what auxilliary templates like mail.css we will need). + # we don't yet know what auxiliary templates like mail.css we will need). # (Note that loading as package_resources with jinja.PackageLoader doesn't # work for the same reason.) if not template_dir: @@ -145,8 +144,8 @@ def read_config(self, config, **kwargs): or self.threepid_behaviour_email == ThreepidBehaviour.LOCAL ): # make sure we can import the required deps - import jinja2 import bleach + import jinja2 # prevent unused warnings jinja2 diff --git a/synapse/config/jwt_config.py b/synapse/config/jwt_config.py index a568726985d1..fce96b4acf14 100644 --- a/synapse/config/jwt_config.py +++ b/synapse/config/jwt_config.py @@ -45,10 +45,37 @@ def read_config(self, config, **kwargs): def generate_config_section(self, **kwargs): return """\ - # The JWT needs to contain a globally unique "sub" (subject) claim. + # JSON web token integration. The following settings can be used to make + # Synapse JSON web tokens for authentication, instead of its internal + # password database. + # + # Each JSON Web Token needs to contain a "sub" (subject) claim, which is + # used as the localpart of the mxid. + # + # Note that this is a non-standard login type and client support is + # expected to be non-existant. + # + # See https://github.com/matrix-org/synapse/blob/master/docs/jwt.md. # #jwt_config: - # enabled: true - # secret: "a secret" - # algorithm: "HS256" + # Uncomment the following to enable authorization using JSON web + # tokens. Defaults to false. + # + #enabled: true + + # This is either the private shared secret or the public key used to + # decode the contents of the JSON web token. + # + # Required if 'enabled' is true. + # + #secret: "provided-by-your-issuer" + + # The algorithm used to sign the JSON web token. + # + # Supported algorithms are listed at + # https://pyjwt.readthedocs.io/en/latest/algorithms.html + # + # Required if 'enabled' is true. + # + #algorithm: "provided-by-your-issuer" """ diff --git a/synapse/events/builder.py b/synapse/events/builder.py index a0c4a40c2782..92aadfe7ef55 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -162,7 +162,7 @@ class EventBuilderFactory(object): def __init__(self, hs): self.clock = hs.get_clock() self.hostname = hs.hostname - self.signing_key = hs.config.signing_key[0] + self.signing_key = hs.signing_key self.store = hs.get_datastore() self.state = hs.get_state_handler() diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 687cd841ac47..a37cc9cb4a9f 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -87,7 +87,7 @@ def __init__(self, hs): self.transport_layer = hs.get_federation_transport_client() self.hostname = hs.hostname - self.signing_key = hs.config.signing_key[0] + self.signing_key = hs.signing_key self._get_pdu_cache = ExpiringCache( cache_name="get_pdu_cache", @@ -245,7 +245,7 @@ async def get_pdu( event_id: event to fetch room_version: version of the room outlier: Indicates whether the PDU is an `outlier`, i.e. if - it's from an arbitary point in the context as opposed to part + it's from an arbitrary point in the context as opposed to part of the current block of PDUs. Defaults to `False` timeout: How long to try (in ms) each destination for before moving to the next destination. None indicates no timeout. @@ -351,7 +351,7 @@ async def _check_sigs_and_hash_and_fetch( outlier: bool = False, include_none: bool = False, ) -> List[EventBase]: - """Takes a list of PDUs and checks the signatures and hashs of each + """Takes a list of PDUs and checks the signatures and hashes of each one. If a PDU fails its signature check then we check if we have it in the database and if not then request if from the originating server of that PDU. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index e704cf2f4437..86051decd49a 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -717,7 +717,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool: # server name is a literal IP allow_ip_literals = acl_event.content.get("allow_ip_literals", True) if not isinstance(allow_ip_literals, bool): - logger.warning("Ignorning non-bool allow_ip_literals flag") + logger.warning("Ignoring non-bool allow_ip_literals flag") allow_ip_literals = True if not allow_ip_literals: # check for ipv6 literals. These start with '['. @@ -731,7 +731,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool: # next, check the deny list deny = acl_event.content.get("deny", []) if not isinstance(deny, (list, tuple)): - logger.warning("Ignorning non-list deny ACL %s", deny) + logger.warning("Ignoring non-list deny ACL %s", deny) deny = [] for e in deny: if _acl_entry_matches(server_name, e): @@ -741,7 +741,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool: # then the allow list. allow = acl_event.content.get("allow", []) if not isinstance(allow, (list, tuple)): - logger.warning("Ignorning non-list allow ACL %s", allow) + logger.warning("Ignoring non-list allow ACL %s", allow) allow = [] for e in allow: if _acl_entry_matches(server_name, e): diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 6bbd762681a8..860b03f7b959 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -359,7 +359,7 @@ class BaseFederationRow(object): Specifies how to identify, serialize and deserialize the different types. """ - TypeId = "" # Unique string that ids the type. Must be overriden in sub classes. + TypeId = "" # Unique string that ids the type. Must be overridden in sub classes. @staticmethod def from_data(data): diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 4e698981a4c8..12966e239bd1 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -119,7 +119,7 @@ def pending_edu_count(self) -> int: ) def send_pdu(self, pdu: EventBase, order: int) -> None: - """Add a PDU to the queue, and start the transmission loop if neccessary + """Add a PDU to the queue, and start the transmission loop if necessary Args: pdu: pdu to send @@ -129,7 +129,7 @@ def send_pdu(self, pdu: EventBase, order: int) -> None: self.attempt_new_transaction() def send_presence(self, states: Iterable[UserPresenceState]) -> None: - """Add presence updates to the queue. Start the transmission loop if neccessary. + """Add presence updates to the queue. Start the transmission loop if necessary. Args: states: presence to send diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 1ac522c8a1cd..e502c1205029 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -746,7 +746,7 @@ def invite_to_group_notification(self, destination, group_id, user_id, content): def remove_user_from_group( self, destination, group_id, requester_user_id, user_id, content ): - """Remove a user fron a group + """Remove a user from a group """ path = _create_v1_path("/groups/%s/users/%s/remove", group_id, user_id) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 506f35e39c5f..1478ee03a5ad 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -110,7 +110,7 @@ def __init__(self, hs: HomeServer): self.server_name = hs.hostname self.store = hs.get_datastore() self.federation_domain_whitelist = hs.config.federation_domain_whitelist - self.notifer = hs.get_notifier() + self.notifier = hs.get_notifier() self.replication_client = None if hs.config.worker.worker_app: @@ -176,7 +176,7 @@ async def _reset_retry_timings(self, origin): await self.store.set_destination_retry_timings(origin, None, 0, 0) # Inform the relevant places that the remote server is back up. - self.notifer.notify_remote_server_up(origin) + self.notifier.notify_remote_server_up(origin) if self.replication_client: # If we're on a worker we try and inform master about this. The # replication client doesn't hook into the notifier to avoid diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 27b0c026556c..dab13c243f78 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -70,7 +70,7 @@ def __init__(self, hs): self.keyring = hs.get_keyring() self.clock = hs.get_clock() self.server_name = hs.hostname - self.signing_key = hs.config.signing_key[0] + self.signing_key = hs.signing_key @defer.inlineCallbacks def verify_attestation(self, attestation, group_id, user_id, server_name=None): diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py index 8db8ab1b7bd2..8cb922ddc735 100644 --- a/synapse/groups/groups_server.py +++ b/synapse/groups/groups_server.py @@ -41,7 +41,7 @@ def __init__(self, hs): self.clock = hs.get_clock() self.keyring = hs.get_keyring() self.is_mine_id = hs.is_mine_id - self.signing_key = hs.config.signing_key[0] + self.signing_key = hs.signing_key self.server_name = hs.hostname self.attestations = hs.get_groups_attestation_signing() self.transport_client = hs.get_federation_transport_client() diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 904c96eeec29..92d4c6e16cc4 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -48,8 +48,7 @@ def __init__(self, hs): self.current_max = 0 self.is_processing = False - @defer.inlineCallbacks - def notify_interested_services(self, current_id): + async def notify_interested_services(self, current_id): """Notifies (pushes) all application services interested in this event. Pushing is done asynchronously, so this method won't block for any @@ -74,7 +73,7 @@ def notify_interested_services(self, current_id): ( upper_bound, events, - ) = yield self.store.get_new_events_for_appservice( + ) = await self.store.get_new_events_for_appservice( self.current_max, limit ) @@ -85,10 +84,9 @@ def notify_interested_services(self, current_id): for event in events: events_by_room.setdefault(event.room_id, []).append(event) - @defer.inlineCallbacks - def handle_event(event): + async def handle_event(event): # Gather interested services - services = yield self._get_services_for_event(event) + services = await self._get_services_for_event(event) if len(services) == 0: return # no services need notifying @@ -96,9 +94,9 @@ def handle_event(event): # query API for all services which match that user regex. # This needs to block as these user queries need to be # made BEFORE pushing the event. - yield self._check_user_exists(event.sender) + await self._check_user_exists(event.sender) if event.type == EventTypes.Member: - yield self._check_user_exists(event.state_key) + await self._check_user_exists(event.state_key) if not self.started_scheduler: @@ -115,17 +113,16 @@ def start_scheduler(): self.scheduler.submit_event_for_as(service, event) now = self.clock.time_msec() - ts = yield self.store.get_received_ts(event.event_id) + ts = await self.store.get_received_ts(event.event_id) synapse.metrics.event_processing_lag_by_event.labels( "appservice_sender" ).observe((now - ts) / 1000) - @defer.inlineCallbacks - def handle_room_events(events): + async def handle_room_events(events): for event in events: - yield handle_event(event) + await handle_event(event) - yield make_deferred_yieldable( + await make_deferred_yieldable( defer.gatherResults( [ run_in_background(handle_room_events, evs) @@ -135,10 +132,10 @@ def handle_room_events(events): ) ) - yield self.store.set_appservice_last_pos(upper_bound) + await self.store.set_appservice_last_pos(upper_bound) now = self.clock.time_msec() - ts = yield self.store.get_received_ts(events[-1].event_id) + ts = await self.store.get_received_ts(events[-1].event_id) synapse.metrics.event_processing_positions.labels( "appservice_sender" @@ -161,8 +158,7 @@ def handle_room_events(events): finally: self.is_processing = False - @defer.inlineCallbacks - def query_user_exists(self, user_id): + async def query_user_exists(self, user_id): """Check if any application service knows this user_id exists. Args: @@ -170,15 +166,14 @@ def query_user_exists(self, user_id): Returns: True if this user exists on at least one application service. """ - user_query_services = yield self._get_services_for_user(user_id=user_id) + user_query_services = self._get_services_for_user(user_id=user_id) for user_service in user_query_services: - is_known_user = yield self.appservice_api.query_user(user_service, user_id) + is_known_user = await self.appservice_api.query_user(user_service, user_id) if is_known_user: return True return False - @defer.inlineCallbacks - def query_room_alias_exists(self, room_alias): + async def query_room_alias_exists(self, room_alias): """Check if an application service knows this room alias exists. Args: @@ -193,19 +188,18 @@ def query_room_alias_exists(self, room_alias): s for s in services if (s.is_interested_in_alias(room_alias_str)) ] for alias_service in alias_query_services: - is_known_alias = yield self.appservice_api.query_alias( + is_known_alias = await self.appservice_api.query_alias( alias_service, room_alias_str ) if is_known_alias: # the alias exists now so don't query more ASes. - result = yield self.store.get_association_from_room_alias(room_alias) + result = await self.store.get_association_from_room_alias(room_alias) return result - @defer.inlineCallbacks - def query_3pe(self, kind, protocol, fields): - services = yield self._get_services_for_3pn(protocol) + async def query_3pe(self, kind, protocol, fields): + services = self._get_services_for_3pn(protocol) - results = yield make_deferred_yieldable( + results = await make_deferred_yieldable( defer.DeferredList( [ run_in_background( @@ -224,8 +218,7 @@ def query_3pe(self, kind, protocol, fields): return ret - @defer.inlineCallbacks - def get_3pe_protocols(self, only_protocol=None): + async def get_3pe_protocols(self, only_protocol=None): services = self.store.get_app_services() protocols = {} @@ -238,7 +231,7 @@ def get_3pe_protocols(self, only_protocol=None): if p not in protocols: protocols[p] = [] - info = yield self.appservice_api.get_3pe_protocol(s, p) + info = await self.appservice_api.get_3pe_protocol(s, p) if info is not None: protocols[p].append(info) @@ -263,8 +256,7 @@ def _merge_instances(infos): return protocols - @defer.inlineCallbacks - def _get_services_for_event(self, event): + async def _get_services_for_event(self, event): """Retrieve a list of application services interested in this event. Args: @@ -280,7 +272,7 @@ def _get_services_for_event(self, event): # inside of a list comprehension anymore. interested_list = [] for s in services: - if (yield s.is_interested(event, self.store)): + if await s.is_interested(event, self.store): interested_list.append(s) return interested_list @@ -288,21 +280,20 @@ def _get_services_for_event(self, event): def _get_services_for_user(self, user_id): services = self.store.get_app_services() interested_list = [s for s in services if (s.is_interested_in_user(user_id))] - return defer.succeed(interested_list) + return interested_list def _get_services_for_3pn(self, protocol): services = self.store.get_app_services() interested_list = [s for s in services if s.is_interested_in_protocol(protocol)] - return defer.succeed(interested_list) + return interested_list - @defer.inlineCallbacks - def _is_unknown_user(self, user_id): + async def _is_unknown_user(self, user_id): if not self.is_mine_id(user_id): # we don't know if they are unknown or not since it isn't one of our # users. We can't poke ASes. return False - user_info = yield self.store.get_user_by_id(user_id) + user_info = await self.store.get_user_by_id(user_id) if user_info: return False @@ -311,10 +302,9 @@ def _is_unknown_user(self, user_id): service_list = [s for s in services if s.sender == user_id] return len(service_list) == 0 - @defer.inlineCallbacks - def _check_user_exists(self, user_id): - unknown_user = yield self._is_unknown_user(user_id) + async def _check_user_exists(self, user_id): + unknown_user = await self._is_unknown_user(user_id) if unknown_user: - exists = yield self.query_user_exists(user_id) + exists = await self.query_user_exists(user_id) return exists return True diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index d713a06bf918..a162392e4cb3 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import time import unicodedata @@ -24,7 +23,6 @@ import bcrypt # type: ignore[import] import pymacaroons -import synapse.util.stringutils as stringutils from synapse.api.constants import LoginType from synapse.api.errors import ( AuthError, @@ -45,6 +43,7 @@ from synapse.module_api import ModuleApi from synapse.push.mailer import load_jinja2_templates from synapse.types import Requester, UserID +from synapse.util import stringutils as stringutils from synapse.util.threepids import canonicalise_email from ._base import BaseHandler diff --git a/synapse/handlers/cas_handler.py b/synapse/handlers/cas_handler.py index 76f213723a0d..d79ffefdb563 100644 --- a/synapse/handlers/cas_handler.py +++ b/synapse/handlers/cas_handler.py @@ -12,11 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import urllib -import xml.etree.ElementTree as ET from typing import Dict, Optional, Tuple +from xml.etree import ElementTree as ET from twisted.web.client import PartialDownloadError diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3326535410e5..a94e467aef54 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1577,7 +1577,7 @@ async def on_invite_request( room_version, event.get_pdu_json(), self.hs.hostname, - self.hs.config.signing_key[0], + self.hs.signing_key, ) ) diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py index 7cb106e365ec..ecdb12a7bfc4 100644 --- a/synapse/handlers/groups_local.py +++ b/synapse/handlers/groups_local.py @@ -70,7 +70,7 @@ def __init__(self, hs): self.clock = hs.get_clock() self.keyring = hs.get_keyring() self.is_mine_id = hs.is_mine_id - self.signing_key = hs.config.signing_key[0] + self.signing_key = hs.signing_key self.server_name = hs.hostname self.notifier = hs.get_notifier() self.attestations = hs.get_groups_attestation_signing() diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index a77088e29534..88537a5be3cd 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -268,10 +268,10 @@ async def try_unbind_threepid_with_id_server(self, mxid, threepid, id_server): url_bytes = "/_matrix/identity/api/v1/3pid/unbind".encode("ascii") auth_headers = self.federation_http_client.build_auth_headers( destination=None, - method="POST", + method=b"POST", url_bytes=url_bytes, content=content, - destination_is=id_server, + destination_is=id_server.encode("ascii"), ) headers = {b"Authorization": auth_headers} diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 665ad19b5d3b..da206e1ec112 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Optional, Tuple +from typing import TYPE_CHECKING, Optional, Tuple from canonicaljson import encode_canonical_json, json @@ -55,6 +55,9 @@ from ._base import BaseHandler +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -349,7 +352,7 @@ def _expire_event(self, event_id): class EventCreationHandler(object): - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): self.hs = hs self.auth = hs.get_auth() self.store = hs.get_datastore() @@ -814,11 +817,17 @@ async def handle_new_client_event( 403, "This event is not allowed in this context", Codes.FORBIDDEN ) - try: - await self.auth.check_from_context(room_version, event, context) - except AuthError as err: - logger.warning("Denying new event %r because %s", event, err) - raise err + if event.internal_metadata.is_out_of_band_membership(): + # the only sort of out-of-band-membership events we expect to see here + # are invite rejections we have generated ourselves. + assert event.type == EventTypes.Member + assert event.content["membership"] == Membership.LEAVE + else: + try: + await self.auth.check_from_context(room_version, event, context) + except AuthError as err: + logger.warning("Denying new event %r because %s", event, err) + raise err # Ensure that we can round trip before trying to persist in db try: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index d585114f2d0e..c3fee116c2b3 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -1,7 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2016 OpenMarket Ltd -# Copyright 2018 New Vector Ltd -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2016-2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,17 +16,21 @@ import abc import logging from http import HTTPStatus -from typing import Dict, Iterable, List, Optional, Tuple +from typing import Dict, Iterable, List, Optional, Tuple, Union + +from unpaddedbase64 import encode_base64 from synapse import types -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import MAX_DEPTH, EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError +from synapse.api.room_versions import EventFormatVersions +from synapse.crypto.event_signing import compute_event_reference_hash from synapse.events import EventBase +from synapse.events.builder import create_local_event_from_event_dict from synapse.events.snapshot import EventContext -from synapse.replication.http.membership import ( - ReplicationLocallyRejectInviteRestServlet, -) -from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID +from synapse.events.validator import EventValidator +from synapse.storage.roommember import RoomsForUser +from synapse.types import Collection, JsonDict, Requester, RoomAlias, RoomID, UserID from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room, user_left_room @@ -75,10 +77,6 @@ def __init__(self, hs): ) if self._is_on_event_persistence_instance: self.persist_event_storage = hs.get_storage().persistence - else: - self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client( - hs - ) # This is only used to get at ratelimit function, and # maybe_kick_guest_users. It's fine there are multiple of these as @@ -106,46 +104,28 @@ async def _remote_join( raise NotImplementedError() @abc.abstractmethod - async def _remote_reject_invite( + async def remote_reject_invite( self, + invite_event_id: str, + txn_id: Optional[str], requester: Requester, - remote_room_hosts: List[str], - room_id: str, - target: UserID, - content: dict, - ) -> Tuple[Optional[str], int]: - """Attempt to reject an invite for a room this server is not in. If we - fail to do so we locally mark the invite as rejected. + content: JsonDict, + ) -> Tuple[str, int]: + """ + Rejects an out-of-band invite we have received from a remote server Args: - requester - remote_room_hosts: List of servers to use to try and reject invite - room_id - target: The user rejecting the invite - content: The content for the rejection event + invite_event_id: ID of the invite to be rejected + txn_id: optional transaction ID supplied by the client + requester: user making the rejection request, according to the access token + content: additional content to include in the rejection event. + Normally an empty dict. Returns: - A dictionary to be returned to the client, may - include event_id etc, or nothing if we locally rejected + event id, stream_id of the leave event """ raise NotImplementedError() - async def locally_reject_invite(self, user_id: str, room_id: str) -> int: - """Mark the invite has having been rejected even though we failed to - create a leave event for it. - """ - if self._is_on_event_persistence_instance: - return await self.persist_event_storage.locally_reject_invite( - user_id, room_id - ) - else: - result = await self._locally_reject_client( - instance_name=self._event_stream_writer_instance, - user_id=user_id, - room_id=room_id, - ) - return result["stream_id"] - @abc.abstractmethod async def _user_joined_room(self, target: UserID, room_id: str) -> None: """Notifies distributor on master process that the user has joined the @@ -290,7 +270,7 @@ async def update_membership( content: Optional[dict] = None, new_room: bool = False, require_consent: bool = True, - ) -> Tuple[Optional[str], int]: + ) -> Tuple[str, int]: """Update a user's membership in a room""" key = (room_id,) @@ -324,7 +304,7 @@ async def _update_membership( content: Optional[dict] = None, new_room: bool = False, require_consent: bool = True, - ) -> Tuple[Optional[str], int]: + ) -> Tuple[str, int]: content_specified = bool(content) if content is None: content = {} @@ -516,11 +496,17 @@ async def _update_membership( elif effective_membership_state == Membership.LEAVE: if not is_host_in_room: # perhaps we've been invited - inviter = await self._get_inviter(target.to_string(), room_id) - if not inviter: + invite = await self.store.get_invite_for_local_user_in_room( + user_id=target.to_string(), room_id=room_id + ) # type: Optional[RoomsForUser] + if not invite: raise SynapseError(404, "Not a known room") - if self.hs.is_mine(inviter): + logger.info( + "%s rejects invite to %s from %s", target, room_id, invite.sender + ) + + if self.hs.is_mine_id(invite.sender): # the inviter was on our server, but has now left. Carry on # with the normal rejection codepath. # @@ -528,10 +514,10 @@ async def _update_membership( # active on other servers. pass else: - # send the rejection to the inviter's HS. - remote_room_hosts = remote_room_hosts + [inviter.domain] - return await self._remote_reject_invite( - requester, remote_room_hosts, room_id, target, content, + # send the rejection to the inviter's HS (with fallback to + # local event) + return await self.remote_reject_invite( + invite.event_id, txn_id, requester, content, ) return await self._local_membership_update( @@ -1069,33 +1055,119 @@ async def _remote_join( return event_id, stream_id - async def _remote_reject_invite( + async def remote_reject_invite( self, + invite_event_id: str, + txn_id: Optional[str], requester: Requester, - remote_room_hosts: List[str], - room_id: str, - target: UserID, - content: dict, - ) -> Tuple[Optional[str], int]: - """Implements RoomMemberHandler._remote_reject_invite + content: JsonDict, + ) -> Tuple[str, int]: + """ + Rejects an out-of-band invite received from a remote user + + Implements RoomMemberHandler.remote_reject_invite """ + invite_event = await self.store.get_event(invite_event_id) + room_id = invite_event.room_id + target_user = invite_event.state_key + + # first of all, try doing a rejection via the inviting server fed_handler = self.federation_handler try: + inviter_id = UserID.from_string(invite_event.sender) event, stream_id = await fed_handler.do_remotely_reject_invite( - remote_room_hosts, room_id, target.to_string(), content=content, + [inviter_id.domain], room_id, target_user, content=content ) return event.event_id, stream_id except Exception as e: - # if we were unable to reject the exception, just mark - # it as rejected on our end and plough ahead. + # if we were unable to reject the invite, we will generate our own + # leave event. # # The 'except' clause is very broad, but we need to # capture everything from DNS failures upwards # logger.warning("Failed to reject invite: %s", e) - stream_id = await self.locally_reject_invite(target.to_string(), room_id) - return None, stream_id + return await self._locally_reject_invite( + invite_event, txn_id, requester, content + ) + + async def _locally_reject_invite( + self, + invite_event: EventBase, + txn_id: Optional[str], + requester: Requester, + content: JsonDict, + ) -> Tuple[str, int]: + """Generate a local invite rejection + + This is called after we fail to reject an invite via a remote server. It + generates an out-of-band membership event locally. + + Args: + invite_event: the invite to be rejected + txn_id: optional transaction ID supplied by the client + requester: user making the rejection request, according to the access token + content: additional content to include in the rejection event. + Normally an empty dict. + """ + + room_id = invite_event.room_id + target_user = invite_event.state_key + room_version = await self.store.get_room_version(room_id) + + content["membership"] = Membership.LEAVE + + # the auth events for the new event are the same as that of the invite, plus + # the invite itself. + # + # the prev_events are just the invite. + invite_hash = invite_event.event_id # type: Union[str, Tuple] + if room_version.event_format == EventFormatVersions.V1: + alg, h = compute_event_reference_hash(invite_event) + invite_hash = (invite_event.event_id, {alg: encode_base64(h)}) + + auth_events = tuple(invite_event.auth_events) + (invite_hash,) + prev_events = (invite_hash,) + + # we cap depth of generated events, to ensure that they are not + # rejected by other servers (and so that they can be persisted in + # the db) + depth = min(invite_event.depth + 1, MAX_DEPTH) + + event_dict = { + "depth": depth, + "auth_events": auth_events, + "prev_events": prev_events, + "type": EventTypes.Member, + "room_id": room_id, + "sender": target_user, + "content": content, + "state_key": target_user, + } + + event = create_local_event_from_event_dict( + clock=self.clock, + hostname=self.hs.hostname, + signing_key=self.hs.signing_key, + room_version=room_version, + event_dict=event_dict, + ) + event.internal_metadata.outlier = True + event.internal_metadata.out_of_band_membership = True + if txn_id is not None: + event.internal_metadata.txn_id = txn_id + if requester.access_token_id is not None: + event.internal_metadata.token_id = requester.access_token_id + + EventValidator().validate_new(event, self.config) + + context = await self.state_handler.compute_event_context(event) + context.app_service = requester.app_service + stream_id = await self.event_creation_handler.handle_new_client_event( + requester, event, context, extra_users=[UserID.from_string(target_user)], + ) + return event.event_id, stream_id async def _user_joined_room(self, target: UserID, room_id: str) -> None: """Implements RoomMemberHandler._user_joined_room diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 02e0c4103d9b..897338fd54e2 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -61,21 +61,22 @@ async def _remote_join( return ret["event_id"], ret["stream_id"] - async def _remote_reject_invite( + async def remote_reject_invite( self, + invite_event_id: str, + txn_id: Optional[str], requester: Requester, - remote_room_hosts: List[str], - room_id: str, - target: UserID, content: dict, - ) -> Tuple[Optional[str], int]: - """Implements RoomMemberHandler._remote_reject_invite + ) -> Tuple[str, int]: + """ + Rejects an out-of-band invite received from a remote user + + Implements RoomMemberHandler.remote_reject_invite """ ret = await self._remote_reject_client( + invite_event_id=invite_event_id, + txn_id=txn_id, requester=requester, - remote_room_hosts=remote_room_hosts, - room_id=room_id, - user_id=target.to_string(), content=content, ) return ret["event_id"], ret["stream_id"] diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 6c7abaa57817..879c4c07c65d 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -294,6 +294,9 @@ async def get_all_typing_updates( rows.sort() limited = False + # We, unusually, use a strict limit here as we have all the rows in + # memory rather than pulling them out of the database with a `LIMIT ?` + # clause. if len(rows) > limit: rows = rows[:limit] current_id = rows[-1][0] diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 18f6a8fd292a..148eeb19dc5d 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -176,7 +176,7 @@ class MatrixFederationHttpClient(object): def __init__(self, hs, tls_client_options_factory): self.hs = hs - self.signing_key = hs.config.signing_key[0] + self.signing_key = hs.signing_key self.server_name = hs.hostname real_reactor = hs.get_reactor() @@ -562,13 +562,17 @@ def build_auth_headers( Returns: list[bytes]: a list of headers to be added as "Authorization:" headers """ - request = {"method": method, "uri": url_bytes, "origin": self.server_name} + request = { + "method": method.decode("ascii"), + "uri": url_bytes.decode("ascii"), + "origin": self.server_name, + } if destination is not None: - request["destination"] = destination + request["destination"] = destination.decode("ascii") if destination_is is not None: - request["destination_is"] = destination_is + request["destination_is"] = destination_is.decode("ascii") if content is not None: request["content"] = content diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 1676771ef0fb..c6c0e623c16e 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -164,7 +164,6 @@ def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"): than one caller? Will all of those calling functions have be in a context with an active span? """ - import contextlib import inspect import logging @@ -180,8 +179,8 @@ def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"): from synapse.config import ConfigError if TYPE_CHECKING: - from synapse.server import HomeServer from synapse.http.site import SynapseRequest + from synapse.server import HomeServer # Helper class @@ -227,6 +226,7 @@ class _DummyTagNames(object): tags = _DummyTagNames try: from jaeger_client import Config as JaegerConfig + from synapse.logging.scopecontextmanager import LogContextScopeManager except ImportError: JaegerConfig = None # type: ignore diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 13785038ad96..a9269196b3ed 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -22,6 +22,7 @@ from prometheus_client.core import REGISTRY, Counter, Gauge from twisted.internet import defer +from twisted.python.failure import Failure from synapse.logging.context import LoggingContext, PreserveLoggingContext @@ -212,7 +213,14 @@ def run(): return (yield result) except Exception: - logger.exception("Background process '%s' threw an exception", desc) + # failure.Failure() fishes the original Failure out of our stack, and + # thus gives us a sensible stack trace. + f = Failure() + logger.error( + "Background process '%s' threw an exception", + desc, + exc_info=(f.type, f.value, f.getTracebackObject()), + ) finally: _background_process_in_flight_count.labels(desc).dec() diff --git a/synapse/notifier.py b/synapse/notifier.py index 87c120a59ce1..bd41f7785214 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -83,7 +83,7 @@ def __init__(self, user_id, rooms, current_token, time_now_ms): self.current_token = current_token # The last token for which we should wake up any streams that have a - # token that comes before it. This gets updated everytime we get poked. + # token that comes before it. This gets updated every time we get poked. # We start it at the current token since if we get any streams # that have a token from before we have no idea whether they should be # woken up or not, so lets just wake them up. diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 8e0d3a416d96..2d79ada18933 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -16,7 +16,7 @@ import logging import re -from typing import Pattern +from typing import Any, Dict, List, Pattern, Union from synapse.events import EventBase from synapse.types import UserID @@ -72,13 +72,36 @@ def _test_ineq_condition(condition, number): return False -def tweaks_for_actions(actions): +def tweaks_for_actions(actions: List[Union[str, Dict]]) -> Dict[str, Any]: + """ + Converts a list of actions into a `tweaks` dict (which can then be passed to + the push gateway). + + This function ignores all actions other than `set_tweak` actions, and treats + absent `value`s as `True`, which agrees with the only spec-defined treatment + of absent `value`s (namely, for `highlight` tweaks). + + Args: + actions: list of actions + e.g. [ + {"set_tweak": "a", "value": "AAA"}, + {"set_tweak": "b", "value": "BBB"}, + {"set_tweak": "highlight"}, + "notify" + ] + + Returns: + dictionary of tweaks for those actions + e.g. {"a": "AAA", "b": "BBB", "highlight": True} + """ tweaks = {} for a in actions: if not isinstance(a, dict): continue - if "set_tweak" in a and "value" in a: - tweaks[a["set_tweak"]] = a["value"] + if "set_tweak" in a: + # value is allowed to be absent in which case the value assumed + # should be True. + tweaks[a["set_tweak"]] = a.get("value", True) return tweaks diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index b1cac901eb92..8cfcdb057394 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -66,7 +66,7 @@ "pymacaroons>=0.13.0", "msgpack>=0.5.2", "phonenumbers>=8.2.0", - "prometheus_client>=0.0.18,<0.8.0", + "prometheus_client>=0.0.18,<0.9.0", # we use attr.validators.deep_iterable, which arrived in 19.1.0 "attrs>=19.1.0", "netaddr>=0.7.18", diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 0843d28d4b15..fb0dd04f8878 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -92,11 +92,11 @@ def __init__(self, hs): # assert here that sub classes don't try and use the name. assert ( "instance_name" not in self.PATH_ARGS - ), "`instance_name` is a reserved paramater name" + ), "`instance_name` is a reserved parameter name" assert ( "instance_name" not in signature(self.__class__._serialize_payload).parameters - ), "`instance_name` is a reserved paramater name" + ), "`instance_name` is a reserved parameter name" assert self.METHOD in ("PUT", "POST", "GET") diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index a7174c4a8fc8..63ef6eb7be06 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -14,11 +14,11 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Optional from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint -from synapse.types import Requester, UserID +from synapse.types import JsonDict, Requester, UserID from synapse.util.distributor import user_joined_room, user_left_room if TYPE_CHECKING: @@ -88,49 +88,54 @@ async def _handle_request(self, request, room_id, user_id): class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): - """Rejects the invite for the user and room. + """Rejects an out-of-band invite we have received from a remote server Request format: - POST /_synapse/replication/remote_reject_invite/:room_id/:user_id + POST /_synapse/replication/remote_reject_invite/:event_id { + "txn_id": ..., "requester": ..., - "remote_room_hosts": [...], "content": { ... } } """ NAME = "remote_reject_invite" - PATH_ARGS = ("room_id", "user_id") + PATH_ARGS = ("invite_event_id",) - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super(ReplicationRemoteRejectInviteRestServlet, self).__init__(hs) - self.federation_handler = hs.get_handlers().federation_handler self.store = hs.get_datastore() self.clock = hs.get_clock() self.member_handler = hs.get_room_member_handler() @staticmethod - def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content): + def _serialize_payload( # type: ignore + invite_event_id: str, + txn_id: Optional[str], + requester: Requester, + content: JsonDict, + ): """ Args: - requester(Requester) - room_id (str) - user_id (str) - remote_room_hosts (list[str]): Servers to try and reject via + invite_event_id: ID of the invite to be rejected + txn_id: optional transaction ID supplied by the client + requester: user making the rejection request, according to the access token + content: additional content to include in the rejection event. + Normally an empty dict. """ return { + "txn_id": txn_id, "requester": requester.serialize(), - "remote_room_hosts": remote_room_hosts, "content": content, } - async def _handle_request(self, request, room_id, user_id): + async def _handle_request(self, request, invite_event_id): content = parse_json_object_from_request(request) - remote_room_hosts = content["remote_room_hosts"] + txn_id = content["txn_id"] event_content = content["content"] requester = Requester.deserialize(self.store, content["requester"]) @@ -138,60 +143,14 @@ async def _handle_request(self, request, room_id, user_id): if requester.user: request.authenticated_entity = requester.user.to_string() - logger.info("remote_reject_invite: %s out of room: %s", user_id, room_id) - - try: - event, stream_id = await self.federation_handler.do_remotely_reject_invite( - remote_room_hosts, room_id, user_id, event_content, - ) - event_id = event.event_id - except Exception as e: - # if we were unable to reject the exception, just mark - # it as rejected on our end and plough ahead. - # - # The 'except' clause is very broad, but we need to - # capture everything from DNS failures upwards - # - logger.warning("Failed to reject invite: %s", e) - - stream_id = await self.member_handler.locally_reject_invite( - user_id, room_id - ) - event_id = None + # hopefully we're now on the master, so this won't recurse! + event_id, stream_id = await self.member_handler.remote_reject_invite( + invite_event_id, txn_id, requester, event_content, + ) return 200, {"event_id": event_id, "stream_id": stream_id} -class ReplicationLocallyRejectInviteRestServlet(ReplicationEndpoint): - """Rejects the invite for the user and room locally. - - Request format: - - POST /_synapse/replication/locally_reject_invite/:room_id/:user_id - - {} - """ - - NAME = "locally_reject_invite" - PATH_ARGS = ("room_id", "user_id") - - def __init__(self, hs: "HomeServer"): - super().__init__(hs) - - self.member_handler = hs.get_room_member_handler() - - @staticmethod - def _serialize_payload(room_id, user_id): - return {} - - async def _handle_request(self, request, room_id, user_id): - logger.info("locally_reject_invite: %s out of room: %s", user_id, room_id) - - stream_id = await self.member_handler.locally_reject_invite(user_id, room_id) - - return 200, {"stream_id": stream_id} - - class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): """Notifies that a user has joined or left the room @@ -245,4 +204,3 @@ def register_servlets(hs, http_server): ReplicationRemoteJoinRestServlet(hs).register(http_server) ReplicationRemoteRejectInviteRestServlet(hs).register(http_server) ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server) - ReplicationLocallyRejectInviteRestServlet(hs).register(http_server) diff --git a/synapse/replication/tcp/__init__.py b/synapse/replication/tcp/__init__.py index 523a1358d4e3..1b8718b11daa 100644 --- a/synapse/replication/tcp/__init__.py +++ b/synapse/replication/tcp/__init__.py @@ -25,7 +25,7 @@ * command.py - the definitions of all the valid commands * protocol.py - the TCP protocol classes * resource.py - handles streaming stream updates to replications - * streams/ - the definitons of all the valid streams + * streams/ - the definitions of all the valid streams The general interaction of the classes are: diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index df29732f51a1..4985e40b1ff4 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -33,8 +33,8 @@ from synapse.util.metrics import Measure if TYPE_CHECKING: - from synapse.server import HomeServer from synapse.replication.tcp.handler import ReplicationCommandHandler + from synapse.server import HomeServer logger = logging.getLogger(__name__) diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index ea5937a20cbf..ccc7f1f0d19b 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -18,18 +18,11 @@ allowed to be sent by which side. """ import abc +import json import logging -import platform from typing import Tuple, Type -if platform.python_implementation() == "PyPy": - import json - - _json_encoder = json.JSONEncoder() -else: - import simplejson as json # type: ignore[no-redef] # noqa: F821 - - _json_encoder = json.JSONEncoder(namedtuple_as_object=False) # type: ignore[call-arg] # noqa: F821 +_json_encoder = json.JSONEncoder() logger = logging.getLogger(__name__) @@ -54,7 +47,7 @@ def from_line(cls, line): @abc.abstractmethod def to_line(self) -> str: - """Serialises the comamnd for the wire. Does not include the command + """Serialises the command for the wire. Does not include the command prefix. """ diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index e6a2e2598b66..55b3b7900876 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, TypeVar @@ -149,10 +148,11 @@ def start_replication(self, hs): using TCP. """ if hs.config.redis.redis_enabled: + import txredisapi + from synapse.replication.tcp.redis import ( RedisDirectTcpReplicationClientFactory, ) - import txredisapi logger.info( "Connecting to redis (host=%r port=%r)", diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 4198eece71f5..ca47f5cc88f1 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -317,7 +317,7 @@ def send_command(self, cmd, do_buffer=True): def _queue_command(self, cmd): """Queue the command until the connection is ready to write to again. """ - logger.debug("[%s] Queing as conn %r, cmd: %r", self.id(), self.state, cmd) + logger.debug("[%s] Queueing as conn %r, cmd: %r", self.id(), self.state, cmd) self.pending_commands.append(cmd) if len(self.pending_commands) > self.max_line_buffer: diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index e776b6318307..0a7e7f67be74 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -177,7 +177,7 @@ class RedisDirectTcpReplicationClientFactory(txredisapi.SubscriberFactory): Args: hs outbound_redis_connection: A connection to redis that will be used to - send outbound commands (this is seperate to the redis connection + send outbound commands (this is separate to the redis connection used to subscribe). """ diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index f196eff0720b..9076bbe9f134 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -198,26 +198,6 @@ def current_token_without_instance( return lambda instance_name: current_token() -def db_query_to_update_function( - query_function: Callable[[Token, Token, int], Awaitable[List[tuple]]] -) -> UpdateFunction: - """Wraps a db query function which returns a list of rows to make it - suitable for use as an `update_function` for the Stream class - """ - - async def update_function(instance_name, from_token, upto_token, limit): - rows = await query_function(from_token, upto_token, limit) - updates = [(row[0], row[1:]) for row in rows] - limited = False - if len(updates) >= limit: - upto_token = updates[-1][0] - limited = True - - return updates, upto_token, limited - - return update_function - - def make_http_update_function(hs, stream_name: str) -> UpdateFunction: """Makes a suitable function for use as an `update_function` that queries the master process for updates. @@ -393,7 +373,7 @@ def __init__(self, hs): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_pushers_stream_token), - db_query_to_update_function(store.get_all_updated_pushers_rows), + store.get_all_updated_pushers_rows, ) @@ -421,26 +401,12 @@ class CachesStreamRow: ROW_TYPE = CachesStreamRow def __init__(self, hs): - self.store = hs.get_datastore() + store = hs.get_datastore() super().__init__( hs.get_instance_name(), - self.store.get_cache_stream_token, - self._update_function, - ) - - async def _update_function( - self, instance_name: str, from_token: int, upto_token: int, limit: int - ): - rows = await self.store.get_all_updated_caches( - instance_name, from_token, upto_token, limit + store.get_cache_stream_token, + store.get_all_updated_caches, ) - updates = [(row[0], row[1:]) for row in rows] - limited = False - if len(updates) >= limit: - upto_token = updates[-1][0] - limited = True - - return updates, upto_token, limited class PublicRoomsStream(Stream): @@ -465,7 +431,7 @@ def __init__(self, hs): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_current_public_room_stream_id), - db_query_to_update_function(store.get_all_new_public_rooms), + store.get_all_new_public_rooms, ) @@ -486,7 +452,7 @@ def __init__(self, hs): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_device_stream_token), - db_query_to_update_function(store.get_all_device_list_changes_for_remotes), + store.get_all_device_list_changes_for_remotes, ) @@ -504,7 +470,7 @@ def __init__(self, hs): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_to_device_stream_token), - db_query_to_update_function(store.get_all_new_device_messages), + store.get_all_new_device_messages, ) @@ -524,7 +490,7 @@ def __init__(self, hs): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_max_account_data_stream_id), - db_query_to_update_function(store.get_all_updated_tags), + store.get_all_updated_tags, ) @@ -612,7 +578,7 @@ def __init__(self, hs): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_group_stream_token), - db_query_to_update_function(store.get_all_groups_changes), + store.get_all_groups_changes, ) @@ -630,7 +596,5 @@ def __init__(self, hs): super().__init__( hs.get_instance_name(), current_token_without_instance(store.get_device_stream_token), - db_query_to_update_function( - store.get_all_user_signature_changes_for_remotes - ), + store.get_all_user_signature_changes_for_remotes, ) diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index f3703903314d..1c2a4cce7f8e 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import heapq from collections import Iterable from typing import List, Tuple, Type @@ -22,7 +21,6 @@ from ._base import Stream, StreamUpdateResult, Token, current_token_without_instance - """Handling of the 'events' replication stream This stream contains rows of various types. Each row therefore contains a 'type' @@ -64,7 +62,7 @@ class BaseEventsStreamRow(object): Specifies how to identify, serialize and deserialize the different types. """ - # Unique string that ids the type. Must be overriden in sub classes. + # Unique string that ids the type. Must be overridden in sub classes. TypeId = None # type: str @classmethod diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index f6eef7afee1b..64d5c58b65f3 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import Awaitable, Callable, Dict, Optional from synapse.api.errors import Codes, LoginError, SynapseError from synapse.api.ratelimiting import Ratelimiter @@ -26,7 +27,7 @@ from synapse.http.site import SynapseRequest from synapse.rest.client.v2_alpha._base import client_patterns from synapse.rest.well_known import WellKnownBuilder -from synapse.types import UserID +from synapse.types import JsonDict, UserID from synapse.util.msisdn import phone_number_to_msisdn from synapse.util.threepids import canonicalise_email @@ -114,7 +115,7 @@ def __init__(self, hs): burst_count=self.hs.config.rc_login_failed_attempts.burst_count, ) - def on_GET(self, request): + def on_GET(self, request: SynapseRequest): flows = [] if self.jwt_enabled: flows.append({"type": LoginRestServlet.JWT_TYPE}) @@ -142,10 +143,10 @@ def on_GET(self, request): return 200, {"flows": flows} - def on_OPTIONS(self, request): + def on_OPTIONS(self, request: SynapseRequest): return 200, {} - async def on_POST(self, request): + async def on_POST(self, request: SynapseRequest): self._address_ratelimiter.ratelimit(request.getClientIP()) login_submission = parse_json_object_from_request(request) @@ -154,9 +155,9 @@ async def on_POST(self, request): login_submission["type"] == LoginRestServlet.JWT_TYPE or login_submission["type"] == LoginRestServlet.JWT_TYPE_DEPRECATED ): - result = await self.do_jwt_login(login_submission) + result = await self._do_jwt_login(login_submission) elif login_submission["type"] == LoginRestServlet.TOKEN_TYPE: - result = await self.do_token_login(login_submission) + result = await self._do_token_login(login_submission) else: result = await self._do_other_login(login_submission) except KeyError: @@ -167,14 +168,14 @@ async def on_POST(self, request): result["well_known"] = well_known_data return 200, result - async def _do_other_login(self, login_submission): + async def _do_other_login(self, login_submission: JsonDict) -> Dict[str, str]: """Handle non-token/saml/jwt logins Args: login_submission: Returns: - dict: HTTP response + HTTP response """ # Log the request we got, but only certain fields to minimise the chance of # logging someone's password (even if they accidentally put it in the wrong @@ -292,25 +293,30 @@ async def _do_other_login(self, login_submission): return result async def _complete_login( - self, user_id, login_submission, callback=None, create_non_existent_users=False - ): + self, + user_id: str, + login_submission: JsonDict, + callback: Optional[ + Callable[[Dict[str, str]], Awaitable[Dict[str, str]]] + ] = None, + create_non_existent_users: bool = False, + ) -> Dict[str, str]: """Called when we've successfully authed the user and now need to actually login them in (e.g. create devices). This gets called on - all succesful logins. + all successful logins. - Applies the ratelimiting for succesful login attempts against an + Applies the ratelimiting for successful login attempts against an account. Args: - user_id (str): ID of the user to register. - login_submission (dict): Dictionary of login information. - callback (func|None): Callback function to run after registration. - create_non_existent_users (bool): Whether to create the user if - they don't exist. Defaults to False. + user_id: ID of the user to register. + login_submission: Dictionary of login information. + callback: Callback function to run after registration. + create_non_existent_users: Whether to create the user if they don't + exist. Defaults to False. Returns: - result (Dict[str,str]): Dictionary of account information after - successful registration. + result: Dictionary of account information after successful registration. """ # Before we actually log them in we check if they've already logged in @@ -344,7 +350,7 @@ async def _complete_login( return result - async def do_token_login(self, login_submission): + async def _do_token_login(self, login_submission: JsonDict) -> Dict[str, str]: token = login_submission["token"] auth_handler = self.auth_handler user_id = await auth_handler.validate_short_term_login_token_and_get_user_id( @@ -354,7 +360,7 @@ async def do_token_login(self, login_submission): result = await self._complete_login(user_id, login_submission) return result - async def do_jwt_login(self, login_submission): + async def _do_jwt_login(self, login_submission: JsonDict) -> Dict[str, str]: token = login_submission.get("token", None) if token is None: raise LoginError( diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index bbf98a5cfec3..01b80b86fade 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -217,10 +217,8 @@ async def on_PUT(self, request, room_id, event_type, state_key, txn_id=None): ) event_id = event.event_id - ret = {} # type: dict - if event_id: - set_tag("event_id", event_id) - ret = {"event_id": event_id} + set_tag("event_id", event_id) + ret = {"event_id": event_id} return 200, ret diff --git a/synapse/rest/client/v1/voip.py b/synapse/rest/client/v1/voip.py index 747d46eac201..50277c6cf6c3 100644 --- a/synapse/rest/client/v1/voip.py +++ b/synapse/rest/client/v1/voip.py @@ -50,7 +50,7 @@ async def on_GET(self, request): # We need to use standard padded base64 encoding here # encode_base64 because we need to add the standard padding to get the # same result as the TURN server. - password = base64.b64encode(mac.digest()) + password = base64.b64encode(mac.digest()).decode("ascii") elif turnUris and turnUsername and turnPassword and userLifetime: username = turnUsername diff --git a/synapse/rest/media/v1/thumbnailer.py b/synapse/rest/media/v1/thumbnailer.py index c234ea74212f..7126997134d2 100644 --- a/synapse/rest/media/v1/thumbnailer.py +++ b/synapse/rest/media/v1/thumbnailer.py @@ -12,11 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from io import BytesIO -import PIL.Image as Image +from PIL import Image as Image logger = logging.getLogger(__name__) diff --git a/synapse/secrets.py b/synapse/secrets.py index 0b327a0f8233..5f43f81eb0fd 100644 --- a/synapse/secrets.py +++ b/synapse/secrets.py @@ -19,7 +19,6 @@ See https://docs.python.org/3/library/secrets.html#module-secrets for the API used in Python 3.6, and the API emulated in Python 2.7. """ - import sys # secrets is available since python 3.6 @@ -31,8 +30,8 @@ def Secrets(): else: - import os import binascii + import os class Secrets(object): def token_bytes(self, nbytes=32): diff --git a/synapse/server.py b/synapse/server.py index d14b6b722cc4..f1078a380589 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -234,6 +234,8 @@ def __init__(self, hostname: str, config: HomeServerConfig, reactor=None, **kwar self._reactor = reactor self.hostname = hostname + # the key we use to sign events and requests + self.signing_key = config.key.signing_key[0] self.config = config self._building = {} self._listening_services = [] diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py index d30766e543e9..f39f556c2098 100644 --- a/synapse/storage/data_stores/main/cache.py +++ b/synapse/storage/data_stores/main/cache.py @@ -16,7 +16,7 @@ import itertools import logging -from typing import Any, Iterable, Optional, Tuple +from typing import Any, Iterable, List, Optional, Tuple from synapse.api.constants import EventTypes from synapse.replication.tcp.streams import BackfillStream, CachesStream @@ -46,13 +46,30 @@ def __init__(self, database: Database, db_conn, hs): async def get_all_updated_caches( self, instance_name: str, last_id: int, current_id: int, limit: int - ): - """Fetches cache invalidation rows between the two given IDs written - by the given instance. Returns at most `limit` rows. + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for caches replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data """ if last_id == current_id: - return [] + return [], current_id, False def get_all_updated_caches_txn(txn): # We purposefully don't bound by the current token, as we want to @@ -66,7 +83,14 @@ def get_all_updated_caches_txn(txn): LIMIT ? """ txn.execute(sql, (last_id, instance_name, limit)) - return txn.fetchall() + updates = [(row[0], row[1:]) for row in txn] + limited = False + upto_token = current_id + if len(updates) >= limit: + upto_token = updates[-1][0] + limited = True + + return updates, upto_token, limited return await self.db.runInteraction( "get_all_updated_caches", get_all_updated_caches_txn diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py index 9a1178fb3947..d313b9705f79 100644 --- a/synapse/storage/data_stores/main/deviceinbox.py +++ b/synapse/storage/data_stores/main/deviceinbox.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import List, Tuple from canonicaljson import json @@ -207,31 +208,46 @@ def delete_messages_for_remote_destination_txn(txn): "delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn ) - def get_all_new_device_messages(self, last_pos, current_pos, limit): - """ + async def get_all_new_device_messages( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for to device replication stream. + Args: - last_pos(int): - current_pos(int): - limit(int): + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + Returns: - A deferred list of rows from the device inbox + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data """ - if last_pos == current_pos: - return defer.succeed([]) + + if last_id == current_id: + return [], current_id, False def get_all_new_device_messages_txn(txn): # We limit like this as we might have multiple rows per stream_id, and # we want to make sure we always get all entries for any stream_id # we return. - upper_pos = min(current_pos, last_pos + limit) + upper_pos = min(current_id, last_id + limit) sql = ( "SELECT max(stream_id), user_id" " FROM device_inbox" " WHERE ? < stream_id AND stream_id <= ?" " GROUP BY user_id" ) - txn.execute(sql, (last_pos, upper_pos)) - rows = txn.fetchall() + txn.execute(sql, (last_id, upper_pos)) + updates = [(row[0], row[1:]) for row in txn] sql = ( "SELECT max(stream_id), destination" @@ -239,15 +255,21 @@ def get_all_new_device_messages_txn(txn): " WHERE ? < stream_id AND stream_id <= ?" " GROUP BY destination" ) - txn.execute(sql, (last_pos, upper_pos)) - rows.extend(txn) + txn.execute(sql, (last_id, upper_pos)) + updates.extend((row[0], row[1:]) for row in txn) # Order by ascending stream ordering - rows.sort() + updates.sort() - return rows + limited = False + upto_token = current_id + if len(updates) >= limit: + upto_token = updates[-1][0] + limited = True - return self.db.runInteraction( + return updates, upto_token, limited + + return await self.db.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn ) diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 0ff054245309..343cf9a2d5f2 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -582,32 +582,58 @@ def get_users_whose_signatures_changed(self, user_id, from_key): return set() async def get_all_device_list_changes_for_remotes( - self, from_key: int, to_key: int, limit: int, - ) -> List[Tuple[int, str]]: - """Return a list of `(stream_id, entity)` which is the combined list of - changes to devices and which destinations need to be poked. Entity is - either a user ID (starting with '@') or a remote destination. - """ + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for device lists replication stream. - # This query Does The Right Thing where it'll correctly apply the - # bounds to the inner queries. - sql = """ - SELECT stream_id, entity FROM ( - SELECT stream_id, user_id AS entity FROM device_lists_stream - UNION ALL - SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes - ) AS e - WHERE ? < stream_id AND stream_id <= ? - LIMIT ? + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data """ - return await self.db.execute( + if last_id == current_id: + return [], current_id, False + + def _get_all_device_list_changes_for_remotes(txn): + # This query Does The Right Thing where it'll correctly apply the + # bounds to the inner queries. + sql = """ + SELECT stream_id, entity FROM ( + SELECT stream_id, user_id AS entity FROM device_lists_stream + UNION ALL + SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes + ) AS e + WHERE ? < stream_id AND stream_id <= ? + LIMIT ? + """ + + txn.execute(sql, (last_id, current_id, limit)) + updates = [(row[0], row[1:]) for row in txn] + limited = False + upto_token = current_id + if len(updates) >= limit: + upto_token = updates[-1][0] + limited = True + + return updates, upto_token, limited + + return await self.db.runInteraction( "get_all_device_list_changes_for_remotes", - None, - sql, - from_key, - to_key, - limit, + _get_all_device_list_changes_for_remotes, ) @cached(max_entries=10000) diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py index 1a0842d4b0c2..6c3cff82e1e4 100644 --- a/synapse/storage/data_stores/main/end_to_end_keys.py +++ b/synapse/storage/data_stores/main/end_to_end_keys.py @@ -14,7 +14,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, List +from typing import Dict, List, Tuple from canonicaljson import encode_canonical_json, json @@ -479,34 +479,61 @@ def get_e2e_cross_signing_keys_bulk( return result - def get_all_user_signature_changes_for_remotes(self, from_key, to_key, limit): - """Return a list of changes from the user signature stream to notify remotes. + async def get_all_user_signature_changes_for_remotes( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for groups replication stream. + Note that the user signature stream represents when a user signs their device with their user-signing key, which is not published to other users or servers, so no `destination` is needed in the returned list. However, this is needed to poke workers. Args: - from_key (int): the stream ID to start at (exclusive) - to_key (int): the stream ID to end at (inclusive) + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. Returns: - Deferred[list[(int,str)]] a list of `(stream_id, user_id)` - """ - sql = """ - SELECT stream_id, from_user_id AS user_id - FROM user_signature_stream - WHERE ? < stream_id AND stream_id <= ? - ORDER BY stream_id ASC - LIMIT ? + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data """ - return self.db.execute( + + if last_id == current_id: + return [], current_id, False + + def _get_all_user_signature_changes_for_remotes_txn(txn): + sql = """ + SELECT stream_id, from_user_id AS user_id + FROM user_signature_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + LIMIT ? + """ + txn.execute(sql, (last_id, current_id, limit)) + + updates = [(row[0], (row[1:])) for row in txn] + + limited = False + upto_token = current_id + if len(updates) >= limit: + upto_token = updates[-1][0] + limited = True + + return updates, upto_token, limited + + return await self.db.runInteraction( "get_all_user_signature_changes_for_remotes", - None, - sql, - from_key, - to_key, - limit, + _get_all_user_signature_changes_for_remotes_txn, ) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index cfd24d2f061d..230fb5cd7f29 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -14,7 +14,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import itertools import logging from collections import OrderedDict, namedtuple @@ -28,12 +27,7 @@ from twisted.internet import defer import synapse.metrics -from synapse.api.constants import ( - EventContentFields, - EventTypes, - Membership, - RelationTypes, -) +from synapse.api.constants import EventContentFields, EventTypes, RelationTypes from synapse.api.room_versions import RoomVersions from synapse.crypto.event_signing import compute_event_reference_hash from synapse.events import EventBase # noqa: F401 @@ -48,8 +42,8 @@ from synapse.util.iterutils import batch_iter if TYPE_CHECKING: - from synapse.storage.data_stores.main import DataStore from synapse.server import HomeServer + from synapse.storage.data_stores.main import DataStore logger = logging.getLogger(__name__) @@ -820,7 +814,6 @@ def _delete_existing_rows_txn(cls, txn, events_and_contexts): "event_reference_hashes", "event_search", "event_to_state_groups", - "local_invites", "state_events", "rejections", "redactions", @@ -1197,65 +1190,27 @@ def _store_room_members_txn(self, txn, events, backfilled): (event.state_key,), ) - # We update the local_invites table only if the event is "current", - # i.e., its something that has just happened. If the event is an - # outlier it is only current if its an "out of band membership", - # like a remote invite or a rejection of a remote invite. - is_new_state = not backfilled and ( - not event.internal_metadata.is_outlier() - or event.internal_metadata.is_out_of_band_membership() - ) - is_mine = self.is_mine_id(event.state_key) - if is_new_state and is_mine: - if event.membership == Membership.INVITE: - self.db.simple_insert_txn( - txn, - table="local_invites", - values={ - "event_id": event.event_id, - "invitee": event.state_key, - "inviter": event.sender, - "room_id": event.room_id, - "stream_id": event.internal_metadata.stream_ordering, - }, - ) - else: - sql = ( - "UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE" - " room_id = ? AND invitee = ? AND locally_rejected is NULL" - " AND replaced_by is NULL" - ) - - txn.execute( - sql, - ( - event.internal_metadata.stream_ordering, - event.event_id, - event.room_id, - event.state_key, - ), - ) - - # We also update the `local_current_membership` table with - # latest invite info. This will usually get updated by the - # `current_state_events` handling, unless its an outlier. - if event.internal_metadata.is_outlier(): - # This should only happen for out of band memberships, so - # we add a paranoia check. - assert event.internal_metadata.is_out_of_band_membership() - - self.db.simple_upsert_txn( - txn, - table="local_current_membership", - keyvalues={ - "room_id": event.room_id, - "user_id": event.state_key, - }, - values={ - "event_id": event.event_id, - "membership": event.membership, - }, - ) + # We update the local_current_membership table only if the event is + # "current", i.e., its something that has just happened. + # + # This will usually get updated by the `current_state_events` handling, + # unless its an outlier, and an outlier is only "current" if it's an "out of + # band membership", like a remote invite or a rejection of a remote invite. + if ( + self.is_mine_id(event.state_key) + and not backfilled + and event.internal_metadata.is_outlier() + and event.internal_metadata.is_out_of_band_membership() + ): + self.db.simple_upsert_txn( + txn, + table="local_current_membership", + keyvalues={"room_id": event.room_id, "user_id": event.state_key}, + values={ + "event_id": event.event_id, + "membership": event.membership, + }, + ) def _handle_event_relations(self, txn, event): """Handles inserting relation data during peristence of events @@ -1586,31 +1541,3 @@ def _update_backward_extremeties(self, txn, events): if not ev.internal_metadata.is_outlier() ], ) - - async def locally_reject_invite(self, user_id: str, room_id: str) -> int: - """Mark the invite has having been rejected even though we failed to - create a leave event for it. - """ - - sql = ( - "UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE" - " room_id = ? AND invitee = ? AND locally_rejected is NULL" - " AND replaced_by is NULL" - ) - - def f(txn, stream_ordering): - txn.execute(sql, (stream_ordering, True, room_id, user_id)) - - # We also clear this entry from `local_current_membership`. - # Ideally we'd point to a leave event, but we don't have one, so - # nevermind. - self.db.simple_delete_txn( - txn, - table="local_current_membership", - keyvalues={"room_id": room_id, "user_id": user_id}, - ) - - with self._stream_id_gen.get_next() as stream_ordering: - await self.db.runInteraction("locally_reject_invite", f, stream_ordering) - - return stream_ordering diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 47a3e63589d0..01cad7d4faa2 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -82,10 +82,7 @@ def __init__(self, database: Database, db_conn, hs): # We are the process in charge of generating stream ids for events, # so instantiate ID generators based on the database self._stream_id_gen = StreamIdGenerator( - db_conn, - "events", - "stream_ordering", - extra_tables=[("local_invites", "stream_id")], + db_conn, "events", "stream_ordering", ) self._backfill_id_gen = StreamIdGenerator( db_conn, diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py index fb1361f1c197..4fb9f9850c79 100644 --- a/synapse/storage/data_stores/main/group_server.py +++ b/synapse/storage/data_stores/main/group_server.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import List, Tuple + from canonicaljson import json from twisted.internet import defer @@ -526,13 +528,35 @@ def _get_groups_changes_for_user_txn(txn): "get_groups_changes_for_user", _get_groups_changes_for_user_txn ) - def get_all_groups_changes(self, from_token, to_token, limit): - from_token = int(from_token) - has_changed = self._group_updates_stream_cache.has_any_entity_changed( - from_token - ) + async def get_all_groups_changes( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for groups replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + + last_id = int(last_id) + has_changed = self._group_updates_stream_cache.has_any_entity_changed(last_id) + if not has_changed: - return defer.succeed([]) + return [], current_id, False def _get_all_groups_changes_txn(txn): sql = """ @@ -541,13 +565,21 @@ def _get_all_groups_changes_txn(txn): WHERE ? < stream_id AND stream_id <= ? LIMIT ? """ - txn.execute(sql, (from_token, to_token, limit)) - return [ - (stream_id, group_id, user_id, gtype, json.loads(content_json)) + txn.execute(sql, (last_id, current_id, limit)) + updates = [ + (stream_id, (group_id, user_id, gtype, json.loads(content_json))) for stream_id, group_id, user_id, gtype, content_json in txn ] - return self.db.runInteraction( + limited = False + upto_token = current_id + if len(updates) >= limit: + upto_token = updates[-1][0] + limited = True + + return updates, upto_token, limited + + return await self.db.runInteraction( "get_all_groups_changes", _get_all_groups_changes_txn ) diff --git a/synapse/storage/data_stores/main/purge_events.py b/synapse/storage/data_stores/main/purge_events.py index a93e1ef19863..65465691391d 100644 --- a/synapse/storage/data_stores/main/purge_events.py +++ b/synapse/storage/data_stores/main/purge_events.py @@ -361,7 +361,6 @@ def _purge_room_txn(self, txn, room_id): "event_push_summary", "pusher_throttle", "group_summary_rooms", - "local_invites", "room_account_data", "room_tags", "local_current_membership", diff --git a/synapse/storage/data_stores/main/pusher.py b/synapse/storage/data_stores/main/pusher.py index 547b9d69cb29..546101624094 100644 --- a/synapse/storage/data_stores/main/pusher.py +++ b/synapse/storage/data_stores/main/pusher.py @@ -15,7 +15,7 @@ # limitations under the License. import logging -from typing import Iterable, Iterator +from typing import Iterable, Iterator, List, Tuple from canonicaljson import encode_canonical_json, json @@ -98,77 +98,69 @@ def get_pushers(txn): rows = yield self.db.runInteraction("get_all_pushers", get_pushers) return rows - def get_all_updated_pushers(self, last_id, current_id, limit): - if last_id == current_id: - return defer.succeed(([], [])) - - def get_all_updated_pushers_txn(txn): - sql = ( - "SELECT id, user_name, access_token, profile_tag, kind," - " app_id, app_display_name, device_display_name, pushkey, ts," - " lang, data" - " FROM pushers" - " WHERE ? < id AND id <= ?" - " ORDER BY id ASC LIMIT ?" - ) - txn.execute(sql, (last_id, current_id, limit)) - updated = txn.fetchall() - - sql = ( - "SELECT stream_id, user_id, app_id, pushkey" - " FROM deleted_pushers" - " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC LIMIT ?" - ) - txn.execute(sql, (last_id, current_id, limit)) - deleted = txn.fetchall() + async def get_all_updated_pushers_rows( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for pushers replication stream. - return updated, deleted + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. - return self.db.runInteraction( - "get_all_updated_pushers", get_all_updated_pushers_txn - ) + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. - def get_all_updated_pushers_rows(self, last_id, current_id, limit): - """Get all the pushers that have changed between the given tokens. + The token returned can be used in a subsequent call to this + function to get further updatees. - Returns: - Deferred(list(tuple)): each tuple consists of: - stream_id (str) - user_id (str) - app_id (str) - pushkey (str) - was_deleted (bool): whether the pusher was added/updated (False) - or deleted (True) + The updates are a list of 2-tuples of stream ID and the row data """ if last_id == current_id: - return defer.succeed([]) + return [], current_id, False def get_all_updated_pushers_rows_txn(txn): - sql = ( - "SELECT id, user_name, app_id, pushkey" - " FROM pushers" - " WHERE ? < id AND id <= ?" - " ORDER BY id ASC LIMIT ?" - ) + sql = """ + SELECT id, user_name, app_id, pushkey + FROM pushers + WHERE ? < id AND id <= ? + ORDER BY id ASC LIMIT ? + """ txn.execute(sql, (last_id, current_id, limit)) - results = [list(row) + [False] for row in txn] - - sql = ( - "SELECT stream_id, user_id, app_id, pushkey" - " FROM deleted_pushers" - " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC LIMIT ?" - ) + updates = [ + (stream_id, (user_name, app_id, pushkey, False)) + for stream_id, user_name, app_id, pushkey in txn + ] + + sql = """ + SELECT stream_id, user_id, app_id, pushkey + FROM deleted_pushers + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC LIMIT ? + """ txn.execute(sql, (last_id, current_id, limit)) + updates.extend( + (stream_id, (user_name, app_id, pushkey, True)) + for stream_id, user_name, app_id, pushkey in txn + ) + + updates.sort() # Sort so that they're ordered by stream id - results.extend(list(row) + [True] for row in txn) - results.sort() # Sort so that they're ordered by stream id + limited = False + upper_bound = current_id + if len(updates) >= limit: + limited = True + upper_bound = updates[-1][0] - return results + return updates, upper_bound, limited - return self.db.runInteraction( + return await self.db.runInteraction( "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn ) diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py index d72d6affb8d3..98a8b5a11aeb 100644 --- a/synapse/storage/data_stores/main/room.py +++ b/synapse/storage/data_stores/main/room.py @@ -826,7 +826,32 @@ def _quarantine_media_txn( return total_media_quarantined - def get_all_new_public_rooms(self, prev_id, current_id, limit): + async def get_all_new_public_rooms( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for public rooms replication stream. + + Args: + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + + Returns: + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data + """ + if last_id == current_id: + return [], current_id, False + def get_all_new_public_rooms(txn): sql = """ SELECT stream_id, room_id, visibility, appservice_id, network_id @@ -836,13 +861,17 @@ def get_all_new_public_rooms(txn): LIMIT ? """ - txn.execute(sql, (prev_id, current_id, limit)) - return txn.fetchall() + txn.execute(sql, (last_id, current_id, limit)) + updates = [(row[0], row[1:]) for row in txn] + limited = False + upto_token = current_id + if len(updates) >= limit: + upto_token = updates[-1][0] + limited = True - if prev_id == current_id: - return defer.succeed([]) + return updates, upto_token, limited - return self.db.runInteraction( + return await self.db.runInteraction( "get_all_new_public_rooms", get_all_new_public_rooms ) diff --git a/synapse/storage/data_stores/main/schema/delta/25/fts.py b/synapse/storage/data_stores/main/schema/delta/25/fts.py index 4b2ffd35fdca..ee675e71ffcf 100644 --- a/synapse/storage/data_stores/main/schema/delta/25/fts.py +++ b/synapse/storage/data_stores/main/schema/delta/25/fts.py @@ -11,11 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import json import logging -import simplejson - from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.prepare_database import get_statements @@ -66,7 +64,7 @@ def run_create(cur, database_engine, *args, **kwargs): "max_stream_id_exclusive": max_stream_id + 1, "rows_inserted": 0, } - progress_json = simplejson.dumps(progress) + progress_json = json.dumps(progress) sql = ( "INSERT into background_updates (update_name, progress_json)" diff --git a/synapse/storage/data_stores/main/schema/delta/27/ts.py b/synapse/storage/data_stores/main/schema/delta/27/ts.py index 414f9f5aa0a6..b7972cfa8ea3 100644 --- a/synapse/storage/data_stores/main/schema/delta/27/ts.py +++ b/synapse/storage/data_stores/main/schema/delta/27/ts.py @@ -11,11 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import json import logging -import simplejson - from synapse.storage.prepare_database import get_statements logger = logging.getLogger(__name__) @@ -45,7 +43,7 @@ def run_create(cur, database_engine, *args, **kwargs): "max_stream_id_exclusive": max_stream_id + 1, "rows_inserted": 0, } - progress_json = simplejson.dumps(progress) + progress_json = json.dumps(progress) sql = ( "INSERT into background_updates (update_name, progress_json)" diff --git a/synapse/storage/data_stores/main/schema/delta/31/search_update.py b/synapse/storage/data_stores/main/schema/delta/31/search_update.py index 7d8ca5f93f9d..63b757ade6dc 100644 --- a/synapse/storage/data_stores/main/schema/delta/31/search_update.py +++ b/synapse/storage/data_stores/main/schema/delta/31/search_update.py @@ -11,11 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import json import logging -import simplejson - from synapse.storage.engines import PostgresEngine from synapse.storage.prepare_database import get_statements @@ -50,7 +48,7 @@ def run_create(cur, database_engine, *args, **kwargs): "rows_inserted": 0, "have_added_indexes": False, } - progress_json = simplejson.dumps(progress) + progress_json = json.dumps(progress) sql = ( "INSERT into background_updates (update_name, progress_json)" diff --git a/synapse/storage/data_stores/main/schema/delta/33/event_fields.py b/synapse/storage/data_stores/main/schema/delta/33/event_fields.py index bff1256a7b73..a3e81eeac70a 100644 --- a/synapse/storage/data_stores/main/schema/delta/33/event_fields.py +++ b/synapse/storage/data_stores/main/schema/delta/33/event_fields.py @@ -11,11 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import json import logging -import simplejson - from synapse.storage.prepare_database import get_statements logger = logging.getLogger(__name__) @@ -45,7 +43,7 @@ def run_create(cur, database_engine, *args, **kwargs): "max_stream_id_exclusive": max_stream_id + 1, "rows_inserted": 0, } - progress_json = simplejson.dumps(progress) + progress_json = json.dumps(progress) sql = ( "INSERT into background_updates (update_name, progress_json)" diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py index f8c776be3f74..290317fd9457 100644 --- a/synapse/storage/data_stores/main/tags.py +++ b/synapse/storage/data_stores/main/tags.py @@ -15,6 +15,7 @@ # limitations under the License. import logging +from typing import List, Tuple from canonicaljson import json @@ -53,18 +54,32 @@ def tags_by_room(rows): return deferred - @defer.inlineCallbacks - def get_all_updated_tags(self, last_id, current_id, limit): - """Get all the client tags that have changed on the server + async def get_all_updated_tags( + self, instance_name: str, last_id: int, current_id: int, limit: int + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: + """Get updates for tags replication stream. + Args: - last_id(int): The position to fetch from. - current_id(int): The position to fetch up to. + instance_name: The writer we want to fetch updates from. Unused + here since there is only ever one writer. + last_id: The token to fetch updates from. Exclusive. + current_id: The token to fetch updates up to. Inclusive. + limit: The requested limit for the number of rows to return. The + function may return more or fewer rows. + Returns: - A deferred list of tuples of stream_id int, user_id string, - room_id string, tag string and content string. + A tuple consisting of: the updates, a token to use to fetch + subsequent updates, and whether we returned fewer rows than exists + between the requested tokens due to the limit. + + The token returned can be used in a subsequent call to this + function to get further updatees. + + The updates are a list of 2-tuples of stream ID and the row data """ + if last_id == current_id: - return [] + return [], current_id, False def get_all_updated_tags_txn(txn): sql = ( @@ -76,7 +91,7 @@ def get_all_updated_tags_txn(txn): txn.execute(sql, (last_id, current_id, limit)) return txn.fetchall() - tag_ids = yield self.db.runInteraction( + tag_ids = await self.db.runInteraction( "get_all_updated_tags", get_all_updated_tags_txn ) @@ -89,21 +104,27 @@ def get_tag_content(txn, tag_ids): for tag, content in txn: tags.append(json.dumps(tag) + ":" + content) tag_json = "{" + ",".join(tags) + "}" - results.append((stream_id, user_id, room_id, tag_json)) + results.append((stream_id, (user_id, room_id, tag_json))) return results batch_size = 50 results = [] for i in range(0, len(tag_ids), batch_size): - tags = yield self.db.runInteraction( + tags = await self.db.runInteraction( "get_all_updated_tag_content", get_tag_content, tag_ids[i : i + batch_size], ) results.extend(tags) - return results + limited = False + upto_token = current_id + if len(results) >= limit: + upto_token = results[-1][0] + limited = True + + return results, upto_token, limited @defer.inlineCallbacks def get_updated_tags(self, user_id, stream_id): diff --git a/synapse/storage/data_stores/main/ui_auth.py b/synapse/storage/data_stores/main/ui_auth.py index ec2f38c37357..4c044b1a1549 100644 --- a/synapse/storage/data_stores/main/ui_auth.py +++ b/synapse/storage/data_stores/main/ui_auth.py @@ -17,10 +17,10 @@ import attr -import synapse.util.stringutils as stringutils from synapse.api.errors import StoreError from synapse.storage._base import SQLBaseStore from synapse.types import JsonDict +from synapse.util import stringutils as stringutils @attr.s diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index ec894a91cb14..fa4604167666 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -783,9 +783,3 @@ async def _handle_potentially_left_users(self, user_ids: Set[str]): for user_id in left_users: await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id) - - async def locally_reject_invite(self, user_id: str, room_id: str) -> int: - """Mark the invite has having been rejected even though we failed to - create a leave event for it. - """ - return await self.persist_events_store.locally_reject_invite(user_id, room_id) diff --git a/synapse/storage/types.py b/synapse/storage/types.py index daff81c5ee23..2d2b560e748e 100644 --- a/synapse/storage/types.py +++ b/synapse/storage/types.py @@ -12,12 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from typing import Any, Iterable, Iterator, List, Tuple from typing_extensions import Protocol - """ Some very basic protocol definitions for the DB-API2 classes specified in PEP-249 """ diff --git a/synapse/streams/config.py b/synapse/streams/config.py index cd56cd91ed99..ca7c16ff65c1 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -68,13 +68,13 @@ def from_request(cls, request, raise_invalid_params=True, default_limit=None): elif from_tok: from_tok = StreamToken.from_string(from_tok) except Exception: - raise SynapseError(400, "'from' paramater is invalid") + raise SynapseError(400, "'from' parameter is invalid") try: if to_tok: to_tok = StreamToken.from_string(to_tok) except Exception: - raise SynapseError(400, "'to' paramater is invalid") + raise SynapseError(400, "'to' parameter is invalid") limit = parse_integer(request, "limit", default=default_limit) diff --git a/synapse/streams/events.py b/synapse/streams/events.py index fcd2aaa9c90f..5d3eddcfdc3e 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -68,7 +68,7 @@ def get_current_token_for_pagination(self): The returned token does not have the current values for fields other than `room`, since they are not used during pagination. - Retuns: + Returns: Deferred[StreamToken] """ token = StreamToken( diff --git a/synapse/types.py b/synapse/types.py index 9f28f9a19260..40cab6ed747d 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -31,7 +31,7 @@ if sys.version_info[:3] >= (3, 6, 0): from typing import Collection else: - from typing import Sized, Iterable, Container + from typing import Container, Iterable, Sized T_co = TypeVar("T_co", covariant=True) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 60f0de70f7b3..c63256d3bd04 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -55,7 +55,7 @@ def time(self): return self._reactor.seconds() def time_msec(self): - """Returns the current system time in miliseconds since epoch.""" + """Returns the current system time in milliseconds since epoch.""" return int(self.time() * 1000) def looping_call(self, f, msec, *args, **kwargs): diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 65abf0846e92..f562770922d0 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -352,7 +352,7 @@ class ReadWriteLock(object): # resolved when they release the lock). # # Read: We know its safe to acquire a read lock when the latest writer has - # been resolved. The new reader is appeneded to the list of latest readers. + # been resolved. The new reader is appended to the list of latest readers. # # Write: We know its safe to acquire the write lock when both the latest # writers and readers have been resolved. The new writer replaces the latest diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 64f35fc288d2..9b09c08b8981 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -516,7 +516,7 @@ def __init__( """ Args: orig (function) - cached_method_name (str): The name of the chached method. + cached_method_name (str): The name of the cached method. list_name (str): Name of the argument which is the bulk lookup list num_args (int): number of positional arguments (excluding ``self``, but including list_name) to use as cache keys. Defaults to all diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 45af8d3eeb03..da20523b7092 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -39,7 +39,7 @@ class Distributor(object): Signals are named simply by strings. TODO(paul): It would be nice to give signals stronger object identities, - so we can attach metadata, docstrings, detect typoes, etc... But this + so we can attach metadata, docstrings, detect typos, etc... But this model will do for today. """ diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py index 2605f3c65b85..54c046b6e1cd 100644 --- a/synapse/util/patch_inline_callbacks.py +++ b/synapse/util/patch_inline_callbacks.py @@ -192,7 +192,7 @@ def check_yield_points_inner(*args, **kwargs): result = yield d except Exception: # this will fish an earlier Failure out of the stack where possible, and - # thus is preferable to passing in an exeception to the Failure + # thus is preferable to passing in an exception to the Failure # constructor, since it results in less stack-mangling. result = Failure() diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index af6958719632..8794317caab1 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -22,7 +22,7 @@ logger = logging.getLogger(__name__) -# the intial backoff, after the first transaction fails +# the initial backoff, after the first transaction fails MIN_RETRY_INTERVAL = 10 * 60 * 1000 # how much we multiply the backoff by after each subsequent fail @@ -174,7 +174,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): # has been decommissioned. # If we get a 401, then we should probably back off since they # won't accept our requests for at least a while. - # 429 is us being aggresively rate limited, so lets rate limit + # 429 is us being aggressively rate limited, so lets rate limit # ourselves. if exc_val.code == 404 and self.backoff_on_404: valid_err_code = False diff --git a/synapse/visibility.py b/synapse/visibility.py index 3dfd4af26c02..0f042c5696bf 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -319,7 +319,7 @@ def check_event_is_visible(event, state): return True # Lets check to see if all the events have a history visibility - # of "shared" or "world_readable". If thats the case then we don't + # of "shared" or "world_readable". If that's the case then we don't # need to check membership (as we know the server is in the room). event_to_state_ids = yield storage.state.get_state_ids_for_events( frozenset(e.event_id for e in events), @@ -335,7 +335,7 @@ def check_event_is_visible(event, state): visibility_ids.add(hist) # If we failed to find any history visibility events then the default - # is "shared" visiblity. + # is "shared" visibility. if not visibility_ids: all_open = True else: diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index 70c8e7230343..f9ce609923ec 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -192,7 +192,7 @@ def test_verify_json_for_server(self): d = _verify_json_for_server(kr, "server9", {}, 0, "test unsigned") self.failureResultOf(d, SynapseError) - # should suceed on a signed object + # should succeed on a signed object d = _verify_json_for_server(kr, "server9", json1, 500, "test signed") # self.assertFalse(d.called) self.get_success(d) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index ba7148ec0181..ebabe9a7d64b 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -32,10 +32,11 @@ def setUp(self): self.mock_as_api = Mock() self.mock_scheduler = Mock() hs = Mock() - hs.get_datastore = Mock(return_value=self.mock_store) - self.mock_store.get_received_ts.return_value = 0 - hs.get_application_service_api = Mock(return_value=self.mock_as_api) - hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler) + hs.get_datastore.return_value = self.mock_store + self.mock_store.get_received_ts.return_value = defer.succeed(0) + self.mock_store.set_appservice_last_pos.return_value = defer.succeed(None) + hs.get_application_service_api.return_value = self.mock_as_api + hs.get_application_service_scheduler.return_value = self.mock_scheduler hs.get_clock.return_value = MockClock() self.handler = ApplicationServicesHandler(hs) @@ -48,18 +49,18 @@ def test_notify_interested_services(self): self._mkservice(is_interested=False), ] - self.mock_store.get_app_services = Mock(return_value=services) - self.mock_store.get_user_by_id = Mock(return_value=[]) + self.mock_as_api.query_user.return_value = defer.succeed(True) + self.mock_store.get_app_services.return_value = services + self.mock_store.get_user_by_id.return_value = defer.succeed([]) event = Mock( sender="@someone:anywhere", type="m.room.message", room_id="!foo:bar" ) self.mock_store.get_new_events_for_appservice.side_effect = [ - (0, [event]), - (0, []), + defer.succeed((0, [event])), + defer.succeed((0, [])), ] - self.mock_as_api.push = Mock() - yield self.handler.notify_interested_services(0) + yield defer.ensureDeferred(self.handler.notify_interested_services(0)) self.mock_scheduler.submit_event_for_as.assert_called_once_with( interested_service, event ) @@ -68,36 +69,34 @@ def test_notify_interested_services(self): def test_query_user_exists_unknown_user(self): user_id = "@someone:anywhere" services = [self._mkservice(is_interested=True)] - services[0].is_interested_in_user = Mock(return_value=True) - self.mock_store.get_app_services = Mock(return_value=services) - self.mock_store.get_user_by_id = Mock(return_value=None) + services[0].is_interested_in_user.return_value = True + self.mock_store.get_app_services.return_value = services + self.mock_store.get_user_by_id.return_value = defer.succeed(None) event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar") - self.mock_as_api.push = Mock() - self.mock_as_api.query_user = Mock() + self.mock_as_api.query_user.return_value = defer.succeed(True) self.mock_store.get_new_events_for_appservice.side_effect = [ - (0, [event]), - (0, []), + defer.succeed((0, [event])), + defer.succeed((0, [])), ] - yield self.handler.notify_interested_services(0) + yield defer.ensureDeferred(self.handler.notify_interested_services(0)) self.mock_as_api.query_user.assert_called_once_with(services[0], user_id) @defer.inlineCallbacks def test_query_user_exists_known_user(self): user_id = "@someone:anywhere" services = [self._mkservice(is_interested=True)] - services[0].is_interested_in_user = Mock(return_value=True) - self.mock_store.get_app_services = Mock(return_value=services) - self.mock_store.get_user_by_id = Mock(return_value={"name": user_id}) + services[0].is_interested_in_user.return_value = True + self.mock_store.get_app_services.return_value = services + self.mock_store.get_user_by_id.return_value = defer.succeed({"name": user_id}) event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar") - self.mock_as_api.push = Mock() - self.mock_as_api.query_user = Mock() + self.mock_as_api.query_user.return_value = defer.succeed(True) self.mock_store.get_new_events_for_appservice.side_effect = [ - (0, [event]), - (0, []), + defer.succeed((0, [event])), + defer.succeed((0, [])), ] - yield self.handler.notify_interested_services(0) + yield defer.ensureDeferred(self.handler.notify_interested_services(0)) self.assertFalse( self.mock_as_api.query_user.called, "query_user called when it shouldn't have been.", @@ -107,7 +106,7 @@ def test_query_user_exists_known_user(self): def test_query_room_alias_exists(self): room_alias_str = "#foo:bar" room_alias = Mock() - room_alias.to_string = Mock(return_value=room_alias_str) + room_alias.to_string.return_value = room_alias_str room_id = "!alpha:bet" servers = ["aperture"] @@ -118,12 +117,15 @@ def test_query_room_alias_exists(self): self._mkservice_alias(is_interested_in_alias=False), ] - self.mock_store.get_app_services = Mock(return_value=services) - self.mock_store.get_association_from_room_alias = Mock( - return_value=Mock(room_id=room_id, servers=servers) + self.mock_as_api.query_alias.return_value = defer.succeed(True) + self.mock_store.get_app_services.return_value = services + self.mock_store.get_association_from_room_alias.return_value = defer.succeed( + Mock(room_id=room_id, servers=servers) ) - result = yield self.handler.query_room_alias_exists(room_alias) + result = yield defer.ensureDeferred( + self.handler.query_room_alias_exists(room_alias) + ) self.mock_as_api.query_alias.assert_called_once_with( interested_service, room_alias_str @@ -133,14 +135,14 @@ def test_query_room_alias_exists(self): def _mkservice(self, is_interested): service = Mock() - service.is_interested = Mock(return_value=is_interested) + service.is_interested.return_value = defer.succeed(is_interested) service.token = "mock_service_token" service.url = "mock_service_url" return service def _mkservice_alias(self, is_interested_in_alias): service = Mock() - service.is_interested_in_alias = Mock(return_value=is_interested_in_alias) + service.is_interested_in_alias.return_value = is_interested_in_alias service.token = "mock_service_token" service.url = "mock_service_url" return service diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index 6c1dc72bd111..1acf287ca4e8 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -14,11 +14,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import mock -import signedjson.key as key -import signedjson.sign as sign +from signedjson import key as key, sign as sign from twisted.internet import defer diff --git a/tests/push/test_push_rule_evaluator.py b/tests/push/test_push_rule_evaluator.py index af35d23aea78..1f4b5ca2ac11 100644 --- a/tests/push/test_push_rule_evaluator.py +++ b/tests/push/test_push_rule_evaluator.py @@ -15,6 +15,7 @@ from synapse.api.room_versions import RoomVersions from synapse.events import FrozenEvent +from synapse.push import push_rule_evaluator from synapse.push.push_rule_evaluator import PushRuleEvaluatorForEvent from tests import unittest @@ -84,3 +85,19 @@ def test_invalid_body(self): for body in (1, True, {"foo": "bar"}): evaluator = self._get_evaluator({"body": body}) self.assertFalse(evaluator.matches(condition, "@user:test", "foo")) + + def test_tweaks_for_actions(self): + """ + This tests the behaviour of tweaks_for_actions. + """ + + actions = [ + {"set_tweak": "sound", "value": "default"}, + {"set_tweak": "highlight"}, + "notify", + ] + + self.assertEqual( + push_rule_evaluator.tweaks_for_actions(actions), + {"sound": "default", "highlight": True}, + ) diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index 54cd24bf645d..ae6d05a043c2 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -213,7 +213,6 @@ def test_purge_room(self): "event_push_summary", "pusher_throttle", "group_summary_rooms", - "local_invites", "room_account_data", "room_tags", # "state_groups", # Current impl leaves orphaned state groups around. diff --git a/tests/rest/client/test_retention.py b/tests/rest/client/test_retention.py index 9e549d8a91bc..cc264cf0b525 100644 --- a/tests/rest/client/test_retention.py +++ b/tests/rest/client/test_retention.py @@ -127,7 +127,7 @@ def test_visibility(self): events.append(self.get_success(store.get_event(valid_event_id))) - # Advance the time by anothe 2 days. After this, the first event should be + # Advance the time by another 2 days. After this, the first event should be # outdated but not the second one. self.reactor.advance(one_day_ms * 2 / 1000) diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 0fdff79aa79a..3c66255daca7 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -60,7 +60,7 @@ def test_put_presence(self): def test_put_presence_disabled(self): """ - PUT to the status endpoint with use_presence disbled will NOT call + PUT to the status endpoint with use_presence disabled will NOT call set_state on the presence handler. """ self.hs.config.use_presence = False diff --git a/tests/rest/client/v2_alpha/test_relations.py b/tests/rest/client/v2_alpha/test_relations.py index fd641a7c2f55..99c9f4e9282b 100644 --- a/tests/rest/client/v2_alpha/test_relations.py +++ b/tests/rest/client/v2_alpha/test_relations.py @@ -99,7 +99,7 @@ def test_deny_double_react(self): self.assertEquals(400, channel.code, channel.json_body) def test_basic_paginate_relations(self): - """Tests that calling pagination API corectly the latest relations. + """Tests that calling pagination API correctly the latest relations. """ channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction") self.assertEquals(200, channel.code, channel.json_body) diff --git a/tests/rest/media/v1/test_media_storage.py b/tests/rest/media/v1/test_media_storage.py index 2ed9312d564d..66fa5978b2fd 100644 --- a/tests/rest/media/v1/test_media_storage.py +++ b/tests/rest/media/v1/test_media_storage.py @@ -12,8 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - import os import shutil import tempfile @@ -25,8 +23,8 @@ from mock import Mock import attr -import PIL.Image as Image from parameterized import parameterized_class +from PIL import Image as Image from twisted.internet.defer import Deferred diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index 278961c33144..b589506c6043 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -25,7 +25,7 @@ from synapse.storage.engines import create_engine from tests import unittest -from tests.utils import TestHomeServer +from tests.utils import TestHomeServer, default_config class SQLBaseStoreTestCase(unittest.TestCase): @@ -49,10 +49,7 @@ def runWithConnection(func, *args, **kwargs): self.db_pool.runWithConnection = runWithConnection - config = Mock() - config._disable_native_upserts = True - config.caches = Mock() - config.caches.event_cache_size = 1 + config = default_config(name="test", parse=True) hs = TestHomeServer("test", config=config) sqlite_config = {"name": "sqlite3"} diff --git a/tests/test_mau.py b/tests/test_mau.py index 49667ed7f477..654a6fa42d3b 100644 --- a/tests/test_mau.py +++ b/tests/test_mau.py @@ -166,7 +166,7 @@ def test_trial_users_cant_come_back(self): self.do_sync_for_user(token5) self.do_sync_for_user(token6) - # But old user cant + # But old user can't with self.assertRaises(SynapseError) as cm: self.do_sync_for_user(token1) diff --git a/tests/test_utils/event_injection.py b/tests/test_utils/event_injection.py index 431e9f8e5e34..43297b530cbe 100644 --- a/tests/test_utils/event_injection.py +++ b/tests/test_utils/event_injection.py @@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from typing import Optional, Tuple import synapse.server @@ -25,7 +24,6 @@ from tests.test_utils import get_awaitable_result - """ Utility functions for poking events into the storage of the server under test. """ diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index 95301c013cba..58ee918f6533 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -124,7 +124,7 @@ async def testfunc(): @defer.inlineCallbacks def test_make_deferred_yieldable(self): - # a function which retuns an incomplete deferred, but doesn't follow + # a function which returns an incomplete deferred, but doesn't follow # the synapse rules. def blocking_function(): d = defer.Deferred() @@ -183,7 +183,7 @@ def test_nested_logging_context(self): @defer.inlineCallbacks def test_make_deferred_yieldable_with_await(self): - # an async function which retuns an incomplete coroutine, but doesn't + # an async function which returns an incomplete coroutine, but doesn't # follow the synapse rules. async def blocking_function(): diff --git a/tox.ini b/tox.ini index 83641266bb76..e5aef3c062cf 100644 --- a/tox.ini +++ b/tox.ini @@ -132,8 +132,8 @@ commands = [testenv:check_isort] skip_install = True -deps = isort -commands = /bin/sh -c "isort -c -df -sp setup.cfg -rc synapse tests scripts-dev scripts" +deps = isort==5.0.3 +commands = /bin/sh -c "isort -c --df --sp setup.cfg synapse tests scripts-dev scripts" [testenv:check-newsfragment] skip_install = True