From bfbc907cec96ce9a64730930f63ed400c1aa3b5b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 May 2017 10:40:31 +0100 Subject: [PATCH 1/6] Prefill state caches --- synapse/storage/_base.py | 8 ++++---- synapse/storage/events.py | 10 ++++++++-- synapse/storage/state.py | 8 ++++++++ 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c659004e8d10..58b73af7d2b6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -60,12 +60,12 @@ def __init__(self, txn, name, database_engine, after_callbacks): object.__setattr__(self, "database_engine", database_engine) object.__setattr__(self, "after_callbacks", after_callbacks) - def call_after(self, callback, *args): + def call_after(self, callback, *args, **kwargs): """Call the given callback on the main twisted thread after the transaction has finished. Used to invalidate the caches on the correct thread. """ - self.after_callbacks.append((callback, args)) + self.after_callbacks.append((callback, args, kwargs)) def __getattr__(self, name): return getattr(self.txn, name) @@ -319,8 +319,8 @@ def inner_func(conn, *args, **kwargs): inner_func, *args, **kwargs ) finally: - for after_callback, after_args in after_callbacks: - after_callback(*after_args) + for after_callback, after_args, after_kwargs in after_callbacks: + after_callback(*after_args, **after_kwargs) defer.returnValue(result) @defer.inlineCallbacks diff --git a/synapse/storage/events.py b/synapse/storage/events.py index dbd63078c6c7..0dffafd90dff 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -374,6 +374,7 @@ def _persist_events(self, events_and_contexts, backfilled=False, new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + for event, context in chunk: if context.app_service: origin_type = "local" @@ -387,6 +388,11 @@ def _persist_events(self, events_and_contexts, backfilled=False, event_counter.inc(event.type, origin_type, origin_entity) + for room_id, (_, _, new_state) in current_state_for_room.iteritems(): + self.get_current_state_ids.prefill( + (room_id, ), new_state + ) + @defer.inlineCallbacks def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): """Calculates the new forward extremeties for a room given events to @@ -545,7 +551,7 @@ def get_events(ev_ids): if ev_id in events_to_insert } - defer.returnValue((to_delete, to_insert)) + defer.returnValue((to_delete, to_insert, current_state)) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, @@ -698,7 +704,7 @@ def _persist_events_txn(self, txn, events_and_contexts, backfilled, def _update_current_state_txn(self, txn, state_delta_by_room): for room_id, current_state_tuple in state_delta_by_room.iteritems(): - to_delete, to_insert = current_state_tuple + to_delete, to_insert, _ = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", [(ev_id,) for ev_id in to_delete.itervalues()], diff --git a/synapse/storage/state.py b/synapse/storage/state.py index a16afa8df5f9..1e1ce87e0ede 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -227,6 +227,14 @@ def _store_mult_state_groups_txn(self, txn, events_and_contexts): ], ) + txn.call_after( + self._state_group_cache.update, + self._state_group_cache.sequence, + key=context.state_group, + value=context.current_state_ids, + full=True, + ) + self._simple_insert_many_txn( txn, table="event_to_state_groups", From e0d2f6d5b02dd208bc55434b5c2d386827486e9f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 May 2017 11:36:11 +0100 Subject: [PATCH 2/6] Add more granular event send metrics --- synapse/storage/events.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0dffafd90dff..36574f78b8ff 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -374,6 +374,18 @@ def _persist_events(self, events_and_contexts, backfilled=False, new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + for event, context in chunk: + if context.app_service: + origin_type = "local" + origin_entity = context.app_service.id + elif self.hs.is_mine_id(event.sender): + origin_type = "local" + origin_entity = "*client*" + else: + origin_type = "remote" + origin_entity = get_domain_from_id(event.sender) + + event_counter.inc(event.type, origin_type, origin_entity) for event, context in chunk: if context.app_service: From 871605f4e20cce3f093b2eae0f3d2ad7fb43a640 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 May 2017 09:56:05 +0100 Subject: [PATCH 3/6] Comments --- synapse/storage/events.py | 6 +++--- synapse/storage/state.py | 3 +++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 36574f78b8ff..5db7ec16223b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -453,10 +453,10 @@ def _calculate_state_delta(self, room_id, events_context, new_latest_event_ids): Assumes that we are only persisting events for one room at a time. Returns: - 2-tuple (to_delete, to_insert) where both are state dicts, i.e. - (type, state_key) -> event_id. `to_delete` are the entries to + 3-tuple (to_delete, to_insert, new_state) where both are state dicts, + i.e. (type, state_key) -> event_id. `to_delete` are the entries to first be deleted from current_state_events, `to_insert` are entries - to insert. + to insert. `new_state` is the full set of state. May return None if there are no changes to be applied. """ # Now we need to work out the different state sets for diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 1e1ce87e0ede..5d6f7dfa288e 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -227,6 +227,9 @@ def _store_mult_state_groups_txn(self, txn, events_and_contexts): ], ) + # Prefill the state group cache with this group. + # It's fine to use the sequence like this as the state group map + # is immutable. txn.call_after( self._state_group_cache.update, self._state_group_cache.sequence, From e4435b014e50a10ad89c201d6f91b6be35a9b02f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 May 2017 10:00:29 +0100 Subject: [PATCH 4/6] Update comment --- synapse/storage/state.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 5d6f7dfa288e..03981f5d2b04 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -229,7 +229,8 @@ def _store_mult_state_groups_txn(self, txn, events_and_contexts): # Prefill the state group cache with this group. # It's fine to use the sequence like this as the state group map - # is immutable. + # is immutable. (If the map wasn't immutable then this prefill could + # race with another update) txn.call_after( self._state_group_cache.update, self._state_group_cache.sequence, From 608b5a6317ce3797ff279f6d1a8a39f475b55736 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 May 2017 12:55:29 +0100 Subject: [PATCH 5/6] Take a copy before prefilling, as it may be a frozendict --- synapse/storage/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 03981f5d2b04..85acf2ad1e13 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -235,7 +235,7 @@ def _store_mult_state_groups_txn(self, txn, events_and_contexts): self._state_group_cache.update, self._state_group_cache.sequence, key=context.state_group, - value=context.current_state_ids, + value=dict(context.current_state_ids), full=True, ) From 331570ea6f97d570cf2774cd0700eb588e9fb1d7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 May 2017 15:33:07 +0100 Subject: [PATCH 6/6] Remove spurious merge artifacts --- synapse/storage/events.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5db7ec16223b..12dd74daa337 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -387,19 +387,6 @@ def _persist_events(self, events_and_contexts, backfilled=False, event_counter.inc(event.type, origin_type, origin_entity) - for event, context in chunk: - if context.app_service: - origin_type = "local" - origin_entity = context.app_service.id - elif self.hs.is_mine_id(event.sender): - origin_type = "local" - origin_entity = "*client*" - else: - origin_type = "remote" - origin_entity = get_domain_from_id(event.sender) - - event_counter.inc(event.type, origin_type, origin_entity) - for room_id, (_, _, new_state) in current_state_for_room.iteritems(): self.get_current_state_ids.prefill( (room_id, ), new_state