Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Split out state storage into separate data store. #6245

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6245.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Split out state storage into separate data store.
7 changes: 5 additions & 2 deletions synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class AdminHandler(BaseHandler):
def __init__(self, hs):
super(AdminHandler, self).__init__(hs)

self.storage = hs.get_storage()
self.state_store = self.storage.state

@defer.inlineCallbacks
def get_whois(self, user):
connections = []
Expand Down Expand Up @@ -205,7 +208,7 @@ def export_user_data(self, user_id, writer):

from_key = events[-1].internal_metadata.after

events = yield filter_events_for_client(self.store, user_id, events)
events = yield filter_events_for_client(self.storage, user_id, events)

writer.write_events(room_id, events)

Expand Down Expand Up @@ -241,7 +244,7 @@ def export_user_data(self, user_id, writer):
for event_id in extremities:
if not event_to_unseen_prevs[event_id]:
continue
state = yield self.store.get_state_for_event(event_id)
state = yield self.state_store.get_state_for_event(event_id)
writer.write_state(room_id, event_id, state)

return writer.finished()
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(self, hs):

self.hs = hs
self.state = hs.get_state_handler()
self.state_store = hs.get_storage().state
self._auth_handler = hs.get_auth_handler()

@trace
Expand Down Expand Up @@ -178,7 +179,7 @@ def get_user_ids_changed(self, user_id, from_token):
continue

# mapping from event_id -> state_dict
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
prev_state_ids = yield self.state_store.get_state_ids_for_events(event_ids)

# Check if we've joined the room? If so we just blindly add all the users to
# the "possibly changed" users.
Expand Down
6 changes: 5 additions & 1 deletion synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ def get_stream(


class EventHandler(BaseHandler):
def __init__(self, hs):
super(EventHandler, self).__init__(hs)
self.storage = hs.get_storage()

@defer.inlineCallbacks
def get_event(self, user, room_id, event_id):
"""Retrieve a single specified event.
Expand All @@ -172,7 +176,7 @@ def get_event(self, user, room_id, event_id):
is_peeking = user.to_string() not in users

filtered = yield filter_events_for_client(
self.store, user.to_string(), [event], is_peeking=is_peeking
self.storage, user.to_string(), [event], is_peeking=is_peeking
)

if not filtered:
Expand Down
19 changes: 10 additions & 9 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def __init__(self, hs):

self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.state_store = self.storage.state
self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
Expand Down Expand Up @@ -325,7 +326,7 @@ def on_receive_pdu(self, origin, pdu, sent_to_us_directly=False):
event_map = {event_id: pdu}
try:
# Get the state of the events we know about
ours = yield self.store.get_state_groups_ids(room_id, seen)
ours = yield self.state_store.get_state_groups_ids(room_id, seen)

# state_maps is a list of mappings from (type, state_key) to event_id
state_maps = list(
Expand Down Expand Up @@ -889,7 +890,7 @@ def maybe_backfill(self, room_id, current_depth):
# We set `check_history_visibility_only` as we might otherwise get false
# positives from users having been erased.
filtered_extremities = yield filter_events_for_server(
self.store,
self.storage,
self.server_name,
list(extremities_events.values()),
redact=False,
Expand Down Expand Up @@ -1550,7 +1551,7 @@ def get_state_for_pdu(self, room_id, event_id):
event_id, allow_none=False, check_room_id=room_id
)

state_groups = yield self.store.get_state_groups(room_id, [event_id])
state_groups = yield self.state_store.get_state_groups(room_id, [event_id])

if state_groups:
_, state = list(iteritems(state_groups)).pop()
Expand Down Expand Up @@ -1579,7 +1580,7 @@ def get_state_ids_for_pdu(self, room_id, event_id):
event_id, allow_none=False, check_room_id=room_id
)

state_groups = yield self.store.get_state_groups_ids(room_id, [event_id])
state_groups = yield self.state_store.get_state_groups_ids(room_id, [event_id])

if state_groups:
_, state = list(state_groups.items()).pop()
Expand Down Expand Up @@ -1607,7 +1608,7 @@ def on_backfill_request(self, origin, room_id, pdu_list, limit):

events = yield self.store.get_backfill_events(room_id, pdu_list, limit)

events = yield filter_events_for_server(self.store, origin, events)
events = yield filter_events_for_server(self.storage, origin, events)

return events

Expand Down Expand Up @@ -1637,7 +1638,7 @@ def get_persisted_pdu(self, origin, event_id):
if not in_room:
raise AuthError(403, "Host not in room.")

events = yield filter_events_for_server(self.store, origin, [event])
events = yield filter_events_for_server(self.storage, origin, [event])
event = events[0]
return event
else:
Expand Down Expand Up @@ -1903,7 +1904,7 @@ def _check_for_soft_fail(self, event, state, backfilled):
# given state at the event. This should correctly handle cases
# like bans, especially with state res v2.

state_sets = yield self.store.get_state_groups(
state_sets = yield self.state_store.get_state_groups(
event.room_id, extrem_ids
)
state_sets = list(state_sets.values())
Expand Down Expand Up @@ -1994,7 +1995,7 @@ def on_get_missing_events(
)

missing_events = yield filter_events_for_server(
self.store, origin, missing_events
self.storage, origin, missing_events
)

return missing_events
Expand Down Expand Up @@ -2235,7 +2236,7 @@ def _update_context_for_auth_events(self, event, context, auth_events, event_key

# create a new state group as a delta from the existing one.
prev_group = context.state_group
state_group = yield self.store.store_state_group(
state_group = yield self.state_store.store_state_group(
event.event_id,
event.room_id,
prev_group=prev_group,
Expand Down
14 changes: 9 additions & 5 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def __init__(self, hs):
self.validator = EventValidator()
self.snapshot_cache = SnapshotCache()
self._event_serializer = hs.get_event_client_serializer()
self.storage = hs.get_storage()
self.state_store = self.storage.state

def snapshot_all_rooms(
self,
Expand Down Expand Up @@ -169,7 +171,7 @@ def handle_room(event):
elif event.membership == Membership.LEAVE:
room_end_token = "s%d" % (event.stream_ordering,)
deferred_room_state = run_in_background(
self.store.get_state_for_events, [event.event_id]
self.state_store.get_state_for_events, [event.event_id]
)
deferred_room_state.addCallback(
lambda states: states[event.event_id]
Expand All @@ -189,7 +191,9 @@ def handle_room(event):
)
).addErrback(unwrapFirstError)

messages = yield filter_events_for_client(self.store, user_id, messages)
messages = yield filter_events_for_client(
self.storage, user_id, messages
)

start_token = now_token.copy_and_replace("room_key", token)
end_token = now_token.copy_and_replace("room_key", room_end_token)
Expand Down Expand Up @@ -307,7 +311,7 @@ def room_initial_sync(self, requester, room_id, pagin_config=None):
def _room_initial_sync_parted(
self, user_id, room_id, pagin_config, membership, member_event_id, is_peeking
):
room_state = yield self.store.get_state_for_events([member_event_id])
room_state = yield self.state_store.get_state_for_events([member_event_id])

room_state = room_state[member_event_id]

Expand All @@ -322,7 +326,7 @@ def _room_initial_sync_parted(
)

messages = yield filter_events_for_client(
self.store, user_id, messages, is_peeking=is_peeking
self.storage, user_id, messages, is_peeking=is_peeking
)

start_token = StreamToken.START.copy_and_replace("room_key", token)
Expand Down Expand Up @@ -414,7 +418,7 @@ def get_receipts():
)

messages = yield filter_events_for_client(
self.store, user_id, messages, is_peeking=is_peeking
self.storage, user_id, messages, is_peeking=is_peeking
)

start_token = now_token.copy_and_replace("room_key", token)
Expand Down
10 changes: 6 additions & 4 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def __init__(self, hs):
self.clock = hs.get_clock()
self.state = hs.get_state_handler()
self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.state_store = self.storage.state
self._event_serializer = hs.get_event_client_serializer()

@defer.inlineCallbacks
Expand All @@ -82,7 +84,7 @@ def get_room_data(
data = yield self.state.get_current_state(room_id, event_type, state_key)
elif membership == Membership.LEAVE:
key = (event_type, state_key)
room_state = yield self.store.get_state_for_events(
room_state = yield self.state_store.get_state_for_events(
[membership_event_id], StateFilter.from_types([key])
)
data = room_state[membership_event_id].get(key)
Expand Down Expand Up @@ -135,12 +137,12 @@ def get_state_events(
raise NotFoundError("Can't find event for token %s" % (at_token,))

visible_events = yield filter_events_for_client(
self.store, user_id, last_events
self.storage, user_id, last_events
)

event = last_events[0]
if visible_events:
room_state = yield self.store.get_state_for_events(
room_state = yield self.state_store.get_state_for_events(
[event.event_id], state_filter=state_filter
)
room_state = room_state[event.event_id]
Expand All @@ -161,7 +163,7 @@ def get_state_events(
)
room_state = yield self.store.get_events(state_ids.values())
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
room_state = yield self.state_store.get_state_for_events(
[membership_event_id], state_filter=state_filter
)
room_state = room_state[membership_event_id]
Expand Down
12 changes: 8 additions & 4 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def __init__(self, hs):
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.state_store = self.storage.state
self.clock = hs.get_clock()
self._server_name = hs.hostname

Expand Down Expand Up @@ -125,7 +127,9 @@ def _purge_history(self, purge_id, room_id, token, delete_local_events):
self._purges_in_progress_by_room.add(room_id)
try:
with (yield self.pagination_lock.write(room_id)):
yield self.store.purge_history(room_id, token, delete_local_events)
yield self.storage.purge_events.purge_history(
room_id, token, delete_local_events
)
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
except Exception:
Expand Down Expand Up @@ -168,7 +172,7 @@ async def purge_room(self, room_id):
if joined:
raise SynapseError(400, "Users are still joined to this room")

await self.store.purge_room(room_id)
await self.storage.purge_events.purge_room(room_id)

@defer.inlineCallbacks
def get_messages(
Expand Down Expand Up @@ -255,7 +259,7 @@ def get_messages(
events = event_filter.filter(events)

events = yield filter_events_for_client(
self.store, user_id, events, is_peeking=(member_event_id is None)
self.storage, user_id, events, is_peeking=(member_event_id is None)
)

if not events:
Expand All @@ -274,7 +278,7 @@ def get_messages(
(EventTypes.Member, event.sender) for event in events
)

state_ids = yield self.store.get_state_ids_for_event(
state_ids = yield self.state_store.get_state_ids_for_event(
events[0].event_id, state_filter=state_filter
)

Expand Down
6 changes: 4 additions & 2 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,8 @@ class RoomContextHandler(object):
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.state_store = self.storage.state

@defer.inlineCallbacks
def get_event_context(self, user, room_id, event_id, limit, event_filter):
Expand All @@ -848,7 +850,7 @@ def get_event_context(self, user, room_id, event_id, limit, event_filter):

def filter_evts(events):
return filter_events_for_client(
self.store, user.to_string(), events, is_peeking=is_peeking
self.storage, user.to_string(), events, is_peeking=is_peeking
)

event = yield self.store.get_event(
Expand Down Expand Up @@ -890,7 +892,7 @@ def filter_evts(events):
# first? Shouldn't we be consistent with /sync?
# https://github.com/matrix-org/matrix-doc/issues/687

state = yield self.store.get_state_for_events(
state = yield self.state_store.get_state_for_events(
[last_event_id], state_filter=state_filter
)
results["state"] = list(state[last_event_id].values())
Expand Down
12 changes: 7 additions & 5 deletions synapse/handlers/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class SearchHandler(BaseHandler):
def __init__(self, hs):
super(SearchHandler, self).__init__(hs)
self._event_serializer = hs.get_event_client_serializer()
self.storage = hs.get_storage()
self.state_store = self.storage.state

@defer.inlineCallbacks
def get_old_rooms_from_upgraded_room(self, room_id):
Expand Down Expand Up @@ -221,7 +223,7 @@ def search(self, user, content, batch=None):
filtered_events = search_filter.filter([r["event"] for r in results])

events = yield filter_events_for_client(
self.store, user.to_string(), filtered_events
self.storage, user.to_string(), filtered_events
)

events.sort(key=lambda e: -rank_map[e.event_id])
Expand Down Expand Up @@ -271,7 +273,7 @@ def search(self, user, content, batch=None):
filtered_events = search_filter.filter([r["event"] for r in results])

events = yield filter_events_for_client(
self.store, user.to_string(), filtered_events
self.storage, user.to_string(), filtered_events
)

room_events.extend(events)
Expand Down Expand Up @@ -340,11 +342,11 @@ def search(self, user, content, batch=None):
)

res["events_before"] = yield filter_events_for_client(
self.store, user.to_string(), res["events_before"]
self.storage, user.to_string(), res["events_before"]
)

res["events_after"] = yield filter_events_for_client(
self.store, user.to_string(), res["events_after"]
self.storage, user.to_string(), res["events_after"]
)

res["start"] = now_token.copy_and_replace(
Expand Down Expand Up @@ -372,7 +374,7 @@ def search(self, user, content, batch=None):
[(EventTypes.Member, sender) for sender in senders]
)

state = yield self.store.get_state_for_event(
state = yield self.state_store.get_state_for_event(
last_event_id, state_filter
)

Expand Down
Loading