Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Don't hammer the database for destination retry timings every ~5mins #10036

Merged
merged 6 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10036.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Properly invalidate caches for destination retry timings every (instead of expiring entries every 5 minutes).
2 changes: 0 additions & 2 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events, login, presence, room
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
Expand Down Expand Up @@ -237,7 +236,6 @@ class GenericWorkerSlavedStore(
DirectoryStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedTransactionStore,
SlavedProfileStore,
SlavedClientIpStore,
SlavedFilteringStore,
Expand Down
2 changes: 1 addition & 1 deletion synapse/federation/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ async def authenticate_request(self, request, content):
# If we get a valid signed request from the other side, its probably
# alive
retry_timings = await self.store.get_destination_retry_timings(origin)
if retry_timings and retry_timings["retry_last_ts"]:
if retry_timings and retry_timings.retry_last_ts:
run_in_background(self._reset_retry_timings, origin)

return origin
Expand Down
21 changes: 0 additions & 21 deletions synapse/replication/slave/storage/transactions.py

This file was deleted.

4 changes: 2 additions & 2 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
from .stats import StatsStore
from .stream import StreamStore
from .tags import TagsStore
from .transactions import TransactionStore
from .transactions import TransactionWorkerStore
from .ui_auth import UIAuthStore
from .user_directory import UserDirectoryStore
from .user_erasure_store import UserErasureStore
Expand All @@ -83,7 +83,7 @@ class DataStore(
StreamStore,
ProfileStore,
PresenceStore,
TransactionStore,
TransactionWorkerStore,
DirectoryStore,
KeyStore,
StateStore,
Expand Down
66 changes: 37 additions & 29 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
from collections import namedtuple
from typing import Iterable, List, Optional, Tuple

import attr
from canonicaljson import encode_canonical_json

from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage._base import db_to_json
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.types import JsonDict
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.descriptors import cached

db_binary_type = memoryview

Expand All @@ -38,10 +40,23 @@
"_TransactionRow", ("response_code", "response_json")
)

SENTINEL = object()

@attr.s(slots=True, frozen=True, auto_attribs=True)
class DestinationRetryTimings:
"""The current destination retry timing info for a remote server."""

class TransactionWorkerStore(SQLBaseStore):
# The first time we tried and failed to reach the remote server, in ms.
failure_ts: int

# The last time we tried and failed to reach the remote server, in ms.
retry_last_ts: int

# How long since the last time we tried to reach the remote server before
# trying again, in ms.
retry_interval: int


class TransactionWorkerStore(CacheInvalidationWorkerStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

Expand All @@ -60,19 +75,6 @@ def _cleanup_transactions_txn(txn):
"_cleanup_transactions", _cleanup_transactions_txn
)


class TransactionStore(TransactionWorkerStore):
"""A collection of queries for handling PDUs."""

def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)

self._destination_retry_cache = ExpiringCache(
cache_name="get_destination_retry_timings",
clock=self._clock,
expiry_ms=5 * 60 * 1000,
)

async def get_received_txn_response(
self, transaction_id: str, origin: str
) -> Optional[Tuple[int, JsonDict]]:
Expand Down Expand Up @@ -145,7 +147,11 @@ async def set_received_txn_response(
desc="set_received_txn_response",
)

async def get_destination_retry_timings(self, destination):
@cached(max_entries=10000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

10k destinations seems high, but maybe it isn't? (Did you just grab a random number or did you check the previous size of the cache?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flip side was that 1000 felt a bit small. I mostly just pulled it out of thin air, and it matches some other "small" caches.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah I guess 1000 doesn't feel big enough. 👍 Was mostly curious where it came from. Thin air seems OK to me. ✈️

async def get_destination_retry_timings(
self,
destination: str,
) -> Optional[DestinationRetryTimings]:
"""Gets the current retry timings (if any) for a given destination.

Args:
Expand All @@ -156,34 +162,29 @@ async def get_destination_retry_timings(self, destination):
Otherwise a dict for the retry scheme
"""

result = self._destination_retry_cache.get(destination, SENTINEL)
if result is not SENTINEL:
return result

result = await self.db_pool.runInteraction(
"get_destination_retry_timings",
self._get_destination_retry_timings,
destination,
)

# We don't hugely care about race conditions between getting and
# invalidating the cache, since we time out fairly quickly anyway.
self._destination_retry_cache[destination] = result
return result

def _get_destination_retry_timings(self, txn, destination):
def _get_destination_retry_timings(
self, txn, destination: str
) -> Optional[DestinationRetryTimings]:
result = self.db_pool.simple_select_one_txn(
txn,
table="destinations",
keyvalues={"destination": destination},
retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"),
retcols=("failure_ts", "retry_last_ts", "retry_interval"),
allow_none=True,
)

# check we have a row and retry_last_ts is not null or zero
# (retry_last_ts can't be negative)
if result and result["retry_last_ts"]:
return result
return DestinationRetryTimings(**result)
else:
return None

Expand All @@ -204,7 +205,6 @@ async def set_destination_retry_timings(
retry_interval: how long until next retry in ms
"""

self._destination_retry_cache.pop(destination, None)
if self.database_engine.can_native_upsert:
return await self.db_pool.runInteraction(
"set_destination_retry_timings",
Expand Down Expand Up @@ -252,6 +252,10 @@ def _set_destination_retry_timings_native(

txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))

self._invalidate_cache_and_stream(
txn, self.get_destination_retry_timings, (destination,)
)

def _set_destination_retry_timings_emulated(
self, txn, destination, failure_ts, retry_last_ts, retry_interval
):
Expand Down Expand Up @@ -295,6 +299,10 @@ def _set_destination_retry_timings_emulated(
},
)

self._invalidate_cache_and_stream(
txn, self.get_destination_retry_timings, (destination,)
)

async def store_destination_rooms_entries(
self,
destinations: Iterable[str],
Expand Down
8 changes: 3 additions & 5 deletions synapse/util/retryutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,9 @@ async def get_retry_limiter(destination, clock, store, ignore_backoff=False, **k
retry_timings = await store.get_destination_retry_timings(destination)

if retry_timings:
failure_ts = retry_timings["failure_ts"]
retry_last_ts, retry_interval = (
retry_timings["retry_last_ts"],
retry_timings["retry_interval"],
)
failure_ts = retry_timings.failure_ts
retry_last_ts = retry_timings.retry_last_ts
retry_interval = retry_timings.retry_interval

now = int(clock.time_msec())

Expand Down
8 changes: 1 addition & 7 deletions tests/handlers/test_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,8 @@ def prepare(self, reactor, clock, hs):
self.event_source = hs.get_event_sources().sources["typing"]

self.datastore = hs.get_datastore()
retry_timings_res = {
"destination": "",
"retry_last_ts": 0,
"retry_interval": 0,
"failure_ts": None,
}
self.datastore.get_destination_retry_timings = Mock(
return_value=defer.succeed(retry_timings_res)
return_value=defer.succeed(None)
)

self.datastore.get_device_updates_by_remote = Mock(
Expand Down
8 changes: 6 additions & 2 deletions tests/storage/test_transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.storage.databases.main.transactions import DestinationRetryTimings
from synapse.util.retryutils import MAX_RETRY_INTERVAL

from tests.unittest import HomeserverTestCase
Expand All @@ -36,8 +37,11 @@ def test_get_set_transactions(self):
d = self.store.get_destination_retry_timings("example.com")
r = self.get_success(d)

self.assert_dict(
{"retry_last_ts": 50, "retry_interval": 100, "failure_ts": 1000}, r
self.assertEqual(
DestinationRetryTimings(
retry_last_ts=50, retry_interval=100, failure_ts=1000
),
r,
)

def test_initial_set_transactions(self):
Expand Down
18 changes: 11 additions & 7 deletions tests/util/test_retryutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ def test_limiter(self):
except AssertionError:
pass

self.pump()

new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
self.assertEqual(new_timings["failure_ts"], failure_ts)
self.assertEqual(new_timings["retry_last_ts"], failure_ts)
self.assertEqual(new_timings["retry_interval"], MIN_RETRY_INTERVAL)
self.assertEqual(new_timings.failure_ts, failure_ts)
self.assertEqual(new_timings.retry_last_ts, failure_ts)
self.assertEqual(new_timings.retry_interval, MIN_RETRY_INTERVAL)

# now if we try again we should get a failure
self.get_failure(
Expand All @@ -77,14 +79,16 @@ def test_limiter(self):
except AssertionError:
pass

self.pump()

new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
self.assertEqual(new_timings["failure_ts"], failure_ts)
self.assertEqual(new_timings["retry_last_ts"], retry_ts)
self.assertEqual(new_timings.failure_ts, failure_ts)
self.assertEqual(new_timings.retry_last_ts, retry_ts)
self.assertGreaterEqual(
new_timings["retry_interval"], MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 0.5
new_timings.retry_interval, MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 0.5
)
self.assertLessEqual(
new_timings["retry_interval"], MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 2.0
new_timings.retry_interval, MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 2.0
)

#
Expand Down