diff --git a/changelog.d/11046.feature b/changelog.d/11046.feature new file mode 100644 index 000000000000..b5b2728a157c --- /dev/null +++ b/changelog.d/11046.feature @@ -0,0 +1 @@ +Add new external sharded Redis cache. Contributed by Nick @ Beeper. diff --git a/poetry.lock b/poetry.lock index 7c561e318235..b68802c333cb 100644 --- a/poetry.lock +++ b/poetry.lock @@ -471,6 +471,14 @@ typing-extensions = {version = "*", markers = "python_version < \"3.8\""} format = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3987", "uri-template", "webcolors (>=1.11)"] format_nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "uri-template", "webcolors (>=1.11)"] +[[package]] +name = "jump-consistent-hash" +version = "3.2.0" +description = "Implementation of the Jump Consistent Hash algorithm" +category = "main" +optional = true +python-versions = ">=3.6" + [[package]] name = "keyring" version = "23.5.0" @@ -1546,14 +1554,14 @@ docs = ["sphinx", "repoze.sphinx.autointerface"] test = ["zope.i18nmessageid", "zope.testing", "zope.testrunner"] [extras] -all = ["matrix-synapse-ldap3", "psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "pysaml2", "authlib", "lxml", "sentry-sdk", "jaeger-client", "opentracing", "pyjwt", "txredisapi", "hiredis", "Pympler"] +all = ["matrix-synapse-ldap3", "psycopg2", "psycopg2cffi", "psycopg2cffi-compat", "pysaml2", "authlib", "lxml", "sentry-sdk", "jaeger-client", "opentracing", "pyjwt", "txredisapi", "hiredis", "jump-consistent-hash", "Pympler"] cache_memory = ["Pympler"] jwt = ["pyjwt"] matrix-synapse-ldap3 = ["matrix-synapse-ldap3"] oidc = ["authlib"] opentracing = ["jaeger-client", "opentracing"] postgres = ["psycopg2", "psycopg2cffi", "psycopg2cffi-compat"] -redis = ["txredisapi", "hiredis"] +redis = ["txredisapi", "hiredis", "jump-consistent-hash"] saml2 = ["pysaml2"] sentry = ["sentry-sdk"] systemd = ["systemd-python"] @@ -1563,7 +1571,7 @@ url_preview = ["lxml"] [metadata] lock-version = "1.1" python-versions = "^3.7.1" -content-hash = "539e5326f401472d1ffc8325d53d72e544cd70156b3f43f32f1285c4c131f831" +content-hash = "9d6c57f78cd8b796f78ed600aaac47ca9786d4276609dd7ac5b778948e6975ac" [metadata.files] attrs = [ @@ -1925,6 +1933,65 @@ jsonschema = [ {file = "jsonschema-4.4.0-py3-none-any.whl", hash = "sha256:77281a1f71684953ee8b3d488371b162419767973789272434bbc3f29d9c8823"}, {file = "jsonschema-4.4.0.tar.gz", hash = "sha256:636694eb41b3535ed608fe04129f26542b59ed99808b4f688aa32dcf55317a83"}, ] +jump-consistent-hash = [ + {file = "jump-consistent-hash-3.2.0.tar.gz", hash = "sha256:211c67366dc5e4afe00fae4ae5c0550362d8c77c9d73bc6ef428caf471b71603"}, + {file = "jump_consistent_hash-3.2.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:edae564198d67fe1faa06467ebf4739bdd82fcd9ddbdcd6997056b438b65fc21"}, + {file = "jump_consistent_hash-3.2.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:33514ba5ed6b54cbbc315dfbb80ddaddbdaf03a0fe3a14e9781f3366cf9ca87c"}, + {file = "jump_consistent_hash-3.2.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:eb8da39bc50cb60dfa4f6b590b8e0b7ce43239aa4089c3ba2d31fb6fe050809b"}, + {file = "jump_consistent_hash-3.2.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3d8de52b900e074b1646204f813c304a715702d30f07de2bbdbf016a7b19e7bb"}, + {file = "jump_consistent_hash-3.2.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:9c799714ef25ec24f3c3865ac2627883056e2be95fdc66fc1eca8596bf984324"}, + {file = "jump_consistent_hash-3.2.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:6a56ce715e1280fcde4c1c3263f0928046ed6f20e54d27ae750b99bac14a730e"}, + {file = "jump_consistent_hash-3.2.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:556bbb20fecc89af1db94b5263f00921bb2aef2481f12696665419bc813b2f4a"}, + {file = "jump_consistent_hash-3.2.0-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:96fbf0afc99b8a59d4c0ad3aa440e9d3799e3d678572b6618608d8063788ddda"}, + {file = "jump_consistent_hash-3.2.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:69d76a8961cebb968a8090ad72a65b6500deb1bfb1cf06b6f3a0289bacb911e2"}, + {file = "jump_consistent_hash-3.2.0-cp310-cp310-win32.whl", hash = "sha256:24eb16806ae4a49fa6bc97188e37e52cfa6b2a1067485fb9afaf2320b597f601"}, + {file = "jump_consistent_hash-3.2.0-cp310-cp310-win_amd64.whl", hash = "sha256:d786c1f270e7e54f5f7373c2338547abfca08b6dd9a97ec47d6fb75366062f42"}, + {file = "jump_consistent_hash-3.2.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:551238273929668326c4b0e6ac7e7f847c1702148bdcd6d7e654a23f97b6f234"}, + {file = "jump_consistent_hash-3.2.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ae3c260e6a3e7c9bbcef706d64eab153a4a5bd197e126ea4efe0e39ef85212a6"}, + {file = "jump_consistent_hash-3.2.0-cp36-cp36m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:e8434ee93d4facd848a61691629d0a570af77c1c26ed7245f7ea29633ef0388d"}, + {file = "jump_consistent_hash-3.2.0-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:18dca5a8dc315512f3c7594558c59a7d1b8d557b372eae2466ce4660083ee5e3"}, + {file = "jump_consistent_hash-3.2.0-cp36-cp36m-musllinux_1_1_aarch64.whl", hash = "sha256:539e62ee164d08a6c594aa9a9cc99861067653a98e6175eae502ae3e607e43b4"}, + {file = "jump_consistent_hash-3.2.0-cp36-cp36m-musllinux_1_1_i686.whl", hash = "sha256:95db60d9503dbca4481ee8ea062bbf6aa919d9f35dc58fbfcacf6d11d0596e97"}, + {file = "jump_consistent_hash-3.2.0-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:64d6610e81d8a855af0bc8a49badd84f3b83d72fab8830344a5c01b1bcba920e"}, + {file = "jump_consistent_hash-3.2.0-cp36-cp36m-win32.whl", hash = "sha256:32b18783ca974a90d6e75f644b6f0d66c4b8faa3d36e9fe853f3bf910a2bc550"}, + {file = "jump_consistent_hash-3.2.0-cp36-cp36m-win_amd64.whl", hash = "sha256:742e331af2f184f3d8a1680c5ddc3b1c6f51b4620250f3800823c02dcb92f999"}, + {file = "jump_consistent_hash-3.2.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:b5fafacc80b7a7d3323679ab8f69f480dd426414714a73c8c820c41a7e11885a"}, + {file = "jump_consistent_hash-3.2.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cad9cf1fe7f3568a80a80ac6287eadb3a7a48d5d282bca40dffc6ab00c1fc3f8"}, + {file = "jump_consistent_hash-3.2.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:5766978a7dc2314ca5a59fd0b3ae6a5384e6cf460e7eba70914fd438a8b9d1b3"}, + {file = "jump_consistent_hash-3.2.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:f66bb7dcaf50de2ae8975c701b18fea29dbe03361523ee6f5cd0fe585b0579ac"}, + {file = "jump_consistent_hash-3.2.0-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:c548aa0dc6c57738dcd2c77a23cc2a9f41a8201ed8906fd28f18003b822b236d"}, + {file = "jump_consistent_hash-3.2.0-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:e9ccd795056cd1914f480eddb666cc146c7daa77bd59b918c2317e2f175e512c"}, + {file = "jump_consistent_hash-3.2.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:75671183f9a95e30ec0f3cf1d6140f2db6519a0362c022e9b834e91f3988621d"}, + {file = "jump_consistent_hash-3.2.0-cp37-cp37m-win32.whl", hash = "sha256:aa37820833d87cca1f2a8c84331501db341b68d810df85371b77ac20f583dab6"}, + {file = "jump_consistent_hash-3.2.0-cp37-cp37m-win_amd64.whl", hash = "sha256:0b06fc7ca6d251b73a72cfccf8d23484d53e4d480384638d220837d9dbd016b9"}, + {file = "jump_consistent_hash-3.2.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:15171f1e056e95826e740e5f51d87d3b9e1e1250093b6df77b109489715a7036"}, + {file = "jump_consistent_hash-3.2.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:f0b74c3a3a7c5422461022c044f25387570923c402709ce073e23daff40c0ceb"}, + {file = "jump_consistent_hash-3.2.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:54f80456477e4da8a1a4ef7f5ee7987c2e24b4dcc0c977cc49b9dc7d11b69daf"}, + {file = "jump_consistent_hash-3.2.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4f7affb8b230f4f36f91ff4130bf366aaf323772a8fab0b16ba202015499565f"}, + {file = "jump_consistent_hash-3.2.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:1d8b86181f624b17b1c978522eaa220b02ab2f2fee01e74548b1b395111a6f57"}, + {file = "jump_consistent_hash-3.2.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:c3cd852590ce9c0fbb86831908b1b9dd32ed9ed10c9aaad8c92899d0f4f85c79"}, + {file = "jump_consistent_hash-3.2.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:1f6931ee26306967c37a77b0c06e2ef02befcfe71143892bf4fd4cd3900934dc"}, + {file = "jump_consistent_hash-3.2.0-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:edcc1091bece4ae77d691ad827a211ad1d486ee670af08eeff461dbd337bb410"}, + {file = "jump_consistent_hash-3.2.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:d830b9da5c25b1d1d63eea270dd278b220799ed8308740284420278794522077"}, + {file = "jump_consistent_hash-3.2.0-cp38-cp38-win32.whl", hash = "sha256:375556cae738798bbdc82eff86c0cd74564b266d328da0e8fec5c769e40e1a0e"}, + {file = "jump_consistent_hash-3.2.0-cp38-cp38-win_amd64.whl", hash = "sha256:44130ad6eb42043f60554fc3be7bc219211d6eea96e1aa9216bee37627323a5a"}, + {file = "jump_consistent_hash-3.2.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:72ffe714470238b4dae2fd8c7d26345247ef7c0b011e85501367cf9d9259bd9d"}, + {file = "jump_consistent_hash-3.2.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:223ad65b7fa5c14e9a2fded80a6d3d392ba0c91e39af6d858fe8eb6153a09959"}, + {file = "jump_consistent_hash-3.2.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:c02ab09d284be5281e733c21ed1ed791f11bec3cc1711c05ba17b26de3568603"}, + {file = "jump_consistent_hash-3.2.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:42753979b8ee53545282d18906b93c6db99ab77133a2ea9b750f8cc8f951db1c"}, + {file = "jump_consistent_hash-3.2.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:d7e7e9d48158df5b1279351b9633e6b7fa21a2c71870825a9ad7004321b8348c"}, + {file = "jump_consistent_hash-3.2.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:b974d4ff7504c5efc10469dac8b279128aede01c539049e0e1fd86e649e7f37d"}, + {file = "jump_consistent_hash-3.2.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:2fb2fb46ef53dcfa66cf5a9a0075a1c66e04b9fad58177b11307f0dfa08bc8e8"}, + {file = "jump_consistent_hash-3.2.0-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:ded2a899ca80a20e8316d62f5827c829f7fb8cdbc77f46db9e8dfe9534f03542"}, + {file = "jump_consistent_hash-3.2.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:f683f0a69dd651ffff7354307e8f9711c74c6d67bc62c64c5ba70fe7f06be89b"}, + {file = "jump_consistent_hash-3.2.0-cp39-cp39-win32.whl", hash = "sha256:30c766d3dcc78d79a5792be308652ef5d03aaa488644e19154dfba67db480a82"}, + {file = "jump_consistent_hash-3.2.0-cp39-cp39-win_amd64.whl", hash = "sha256:32156055c2bbbe0a91621f8fdc521413cef6103484d18cf216e483201783d250"}, + {file = "jump_consistent_hash-3.2.0-pp37-pypy37_pp73-macosx_10_9_x86_64.whl", hash = "sha256:c3bdf10571e8d4aefd53cdcc9bb5a125cdae6375175fcb71edb5d065eb16c5d0"}, + {file = "jump_consistent_hash-3.2.0-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f03e2ea907bbb639259312ac6aae18242ac38089ff109fed224f5c034ad8fd53"}, + {file = "jump_consistent_hash-3.2.0-pp37-pypy37_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:fbc961389bbe642f5d059ee000c7aea3f188b236755d2ebd2c1261a6866e0fe4"}, + {file = "jump_consistent_hash-3.2.0-pp37-pypy37_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:f724c034cb523f41de5755fe7450027212a9727ba3c13c8c046de11352074bf4"}, + {file = "jump_consistent_hash-3.2.0-pp37-pypy37_pp73-win_amd64.whl", hash = "sha256:4d682973063bff602ae18cd5f1ea8bc07aa0d3ff5d95239dec439fed77496f28"}, +] keyring = [ {file = "keyring-23.5.0-py3-none-any.whl", hash = "sha256:b0d28928ac3ec8e42ef4cc227822647a19f1d544f21f96457965dc01cf555261"}, {file = "keyring-23.5.0.tar.gz", hash = "sha256:9012508e141a80bd1c0b6778d5c610dd9f8c464d75ac6774248500503f972fb9"}, diff --git a/pyproject.toml b/pyproject.toml index ec6e81f25428..1446b4daee14 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -178,6 +178,7 @@ jaeger-client = { version = ">=4.0.0", optional = true } pyjwt = { version = ">=1.6.4", optional = true } txredisapi = { version = ">=1.4.7", optional = true } hiredis = { version = "*", optional = true } +jump-consistent-hash = { version = ">=3.2.0", optional = true } Pympler = { version = "*", optional = true } parameterized = { version = ">=0.7.4", optional = true } idna = { version = ">=2.5", optional = true } @@ -199,7 +200,7 @@ opentracing = ["jaeger-client", "opentracing"] jwt = ["pyjwt"] # hiredis is not a *strict* dependency, but it makes things much faster. # (if it is not installed, we fall back to slow code.) -redis = ["txredisapi", "hiredis"] +redis = ["txredisapi", "hiredis", "jump-consistent-hash"] # Required to use experimental `caches.track_memory_usage` config option. cache_memory = ["pympler"] test = ["parameterized", "idna"] @@ -233,7 +234,7 @@ all = [ # jwt "pyjwt", # redis - "txredisapi", "hiredis", + "txredisapi", "hiredis", "jump-consistent-hash", # cache_memory "pympler", # omitted: diff --git a/stubs/jump.pyi b/stubs/jump.pyi new file mode 100644 index 000000000000..dea10fbce44e --- /dev/null +++ b/stubs/jump.pyi @@ -0,0 +1,15 @@ +# Copyright 2022 Beeper +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +def hash(key: int, buckets: int) -> int: ... diff --git a/stubs/txredisapi.pyi b/stubs/txredisapi.pyi index 695a2307c2c5..4a4bbc40b4ba 100644 --- a/stubs/txredisapi.pyi +++ b/stubs/txredisapi.pyi @@ -33,7 +33,9 @@ class RedisProtocol(protocol.Protocol): only_if_not_exists: bool = False, only_if_exists: bool = False, ) -> "Deferred[None]": ... + def mset(self, values: dict[str, Any]) -> "Deferred[Any]": ... def get(self, key: str) -> "Deferred[Any]": ... + def mget(self, keys: list[str]) -> "Deferred[Any]": ... class SubscriberProtocol(RedisProtocol): def __init__(self, *args: object, **kwargs: object): ... diff --git a/synapse/config/redis.py b/synapse/config/redis.py index ec7a735418d6..5fd02c0bf285 100644 --- a/synapse/config/redis.py +++ b/synapse/config/redis.py @@ -35,6 +35,12 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None: self.redis_port = redis_config.get("port", 6379) self.redis_password = redis_config.get("password") + cache_shard_config = redis_config.get("cache_shards") + if cache_shard_config: + self.cache_shard_hosts = cache_shard_config.get("hosts", []) + self.cache_shard_expire = cache_shard_config.get("expire_caches", False) + self.cache_shard_ttl = cache_shard_config.get("cache_entry_ttl", False) + def generate_config_section(self, **kwargs: Any) -> str: return """\ # Configuration for Redis when using workers. This *must* be enabled when @@ -54,4 +60,16 @@ def generate_config_section(self, **kwargs: Any) -> str: # Optional password if configured on the Redis instance # #password: + + # Optional one or more Redis hosts to use for long term shared caches. + # Should be configured to automatically expire records when out of + # memory, and not be the same instance as used for replication. + # + #cache_shards: + # enabled: false + # expire_caches: false + # cache_entry_ttl: 30m + # hosts: + # - host: localhost + # port: 6379 """ diff --git a/synapse/replication/tcp/external_sharded_cache.py b/synapse/replication/tcp/external_sharded_cache.py new file mode 100644 index 000000000000..776cb7b65888 --- /dev/null +++ b/synapse/replication/tcp/external_sharded_cache.py @@ -0,0 +1,191 @@ +# Copyright 2022 Beeper +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import binascii +import logging +import marshal +from collections import defaultdict +from typing import TYPE_CHECKING, Any, Iterable, Union + +import jump +from prometheus_client import Counter, Histogram + +from twisted.internet import defer + +from synapse.logging import opentracing +from synapse.logging.context import make_deferred_yieldable +from synapse.replication.tcp.redis import lazyConnection +from synapse.util import unwrapFirstError + +if TYPE_CHECKING: + from synapse.server import HomeServer + +set_counter = Counter( + "synapse_external_sharded_cache_set", + "Number of times we set a cache", + labelnames=["cache_name"], +) + +get_counter = Counter( + "synapse_external_sharded_cache_get", + "Number of times we get a cache", + labelnames=["cache_name", "hit"], +) + +response_timer = Histogram( + "synapse_external_sharded_cache_response_time_seconds", + "Time taken to get a response from Redis for a cache get/set request", + labelnames=["method"], + buckets=( + 0.001, + 0.002, + 0.005, + 0.01, + 0.02, + 0.05, + ), +) + + +logger = logging.getLogger(__name__) + + +class ExternalShardedCache: + """A cache backed by an external Redis. Does nothing if no Redis is + configured. + """ + + def __init__(self, hs: "HomeServer"): + self._redis_shards = [] + + if hs.config.redis.redis_enabled and hs.config.redis.cache_shard_hosts: + for shard in hs.config.redis.cache_shard_hosts: + logger.info( + "Connecting to redis (host=%r port=%r) for external cache", + shard["host"], + shard["port"], + ) + self._redis_shards.append( + lazyConnection( + hs=hs, + host=shard["host"], + port=shard["port"], + reconnect=True, + ), + ) + + def _get_redis_key(self, cache_name: str, key: str) -> str: + return "sharded_cache_v1:%s:%s" % (cache_name, key) + + def _get_redis_shard_id(self, redis_key: str) -> int: + key = binascii.crc32(redis_key.encode()) & 0xFFFFFFFF + idx = jump.hash(key, len(self._redis_shards)) + return idx + + def is_enabled(self) -> bool: + """Whether the external cache is used or not. + + It's safe to use the cache when this returns false, the methods will + just no-op, but the function is useful to avoid doing unnecessary work. + """ + return bool(self._redis_shards) + + async def mset( + self, + cache_name: str, + values: dict[str, Any], + ) -> None: + """Add the key/value combinations to the named cache, with the expiry time given.""" + + if not self.is_enabled(): + return + + set_counter.labels(cache_name).inc(len(values)) + + logger.debug("Caching %s: %r", cache_name, values) + + shard_id_to_encoded_values: dict[int, dict[str, Any]] = defaultdict(dict) + + for key, value in values.items(): + redis_key = self._get_redis_key(cache_name, key) + shard_id = self._get_redis_shard_id(redis_key) + shard_id_to_encoded_values[shard_id][redis_key] = marshal.dumps(value) + + with opentracing.start_active_span( + "ExternalShardedCache.set", + tags={opentracing.SynapseTags.CACHE_NAME: cache_name}, + ): + with response_timer.labels("set").time(): + deferreds = [ + self._redis_shards[shard_id].mset(values) + for shard_id, values in shard_id_to_encoded_values.items() + ] + await make_deferred_yieldable( + defer.gatherResults(deferreds, consumeErrors=True) + ).addErrback(unwrapFirstError) + + async def set(self, cache_name: str, key: str, value: Any) -> None: + await self.mset(cache_name, {key: value}) + + async def _mget_shard( + self, shard_id: int, key_mapping: dict[str, str] + ) -> dict[str, Any]: + results = await self._redis_shards[shard_id].mget(list(key_mapping.values())) + original_keys = list(key_mapping.keys()) + mapped_results: dict[str, Any] = {} + for i, result in enumerate(results): + if result: + result = marshal.loads(result) + mapped_results[original_keys[i]] = result + return mapped_results + + async def mget(self, cache_name: str, keys: Iterable[str]) -> dict[str, Any]: + """Look up a key/value combinations in the named cache.""" + + if not self.is_enabled(): + return {} + + shard_id_to_key_mapping: dict[int, dict[str, str]] = defaultdict(dict) + + for key in keys: + redis_key = self._get_redis_key(cache_name, key) + shard_id = self._get_redis_shard_id(redis_key) + shard_id_to_key_mapping[shard_id][key] = redis_key + + with opentracing.start_active_span( + "ExternalShardedCache.get", + tags={opentracing.SynapseTags.CACHE_NAME: cache_name}, + ): + with response_timer.labels("get").time(): + deferreds = [ + defer.ensureDeferred(self._mget_shard(shard_id, keys)) + for shard_id, keys in shard_id_to_key_mapping.items() + ] + results: Union[ + list, list[dict[str, Any]] + ] = await make_deferred_yieldable( + defer.gatherResults(deferreds, consumeErrors=True) + ).addErrback( + unwrapFirstError + ) + + combined_results: dict[str, Any] = {} + for result in results: + combined_results.update(result) + + logger.debug("Got cache result %s %s: %r", cache_name, keys, combined_results) + + get_counter.labels(cache_name, result is not None).inc() + + return combined_results diff --git a/synapse/server.py b/synapse/server.py index a66ec228dbab..ff9297fbc6af 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -110,6 +110,7 @@ from synapse.push.pusherpool import PusherPool from synapse.replication.tcp.client import ReplicationDataHandler from synapse.replication.tcp.external_cache import ExternalCache +from synapse.replication.tcp.external_sharded_cache import ExternalShardedCache from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.resource import ReplicationStreamer from synapse.replication.tcp.streams import STREAMS_MAP, Stream @@ -771,6 +772,10 @@ def get_event_auth_handler(self) -> EventAuthHandler: def get_external_cache(self) -> ExternalCache: return ExternalCache(self) + @cache_in_self + def get_external_sharded_cache(self) -> ExternalShardedCache: + return ExternalShardedCache(self) + @cache_in_self def get_account_handler(self) -> AccountHandler: return AccountHandler(self) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index e222b7bd1f88..828da7bf3a9f 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -830,15 +830,41 @@ async def _get_joined_profiles_from_event_ids( Map from event ID to `user_id` and ProfileInfo (or None if not join event). """ - rows = await self.db_pool.simple_select_many_batch( - table="room_memberships", - column="event_id", - iterable=event_ids, - retcols=("user_id", "display_name", "avatar_url", "event_id"), - keyvalues={"membership": Membership.JOIN}, - batch_size=500, - desc="_get_joined_profiles_from_event_ids", - ) + sharded_cache = self.hs.get_external_sharded_cache() + sharded_cache_enabled = sharded_cache.is_enabled() + + missing = [] + rows = [] + + if sharded_cache_enabled: + event_id_to_row = await sharded_cache.mget( + "_get_joined_profile_from_event_id", event_ids + ) + for event_id, row in event_id_to_row.items(): + if row: + rows.append(row) + else: + missing.append(event_id) + else: + missing = list(event_ids) + + if missing: + missing_rows = await self.db_pool.simple_select_many_batch( + table="room_memberships", + column="event_id", + iterable=event_ids, + retcols=("user_id", "display_name", "avatar_url", "event_id"), + keyvalues={"membership": Membership.JOIN}, + batch_size=500, + desc="_get_joined_profiles_from_event_ids", + ) + rows += missing_rows + + if sharded_cache_enabled and missing_rows: + await sharded_cache.mset( + "_get_joined_profile_from_event_id", + {row["event_id"]: row for row in missing_rows}, + ) return { row["event_id"]: (