Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
rewrite _send_events_for_new_room to batch events for persisting
Browse files Browse the repository at this point in the history
  • Loading branch information
H-Shay committed Sep 21, 2022
1 parent d573939 commit 3ece6b4
Showing 1 changed file with 68 additions and 46 deletions.
114 changes: 68 additions & 46 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,9 @@ async def _send_events_for_new_room(
creator_join_profile: Optional[JsonDict] = None,
ratelimit: bool = True,
) -> Tuple[int, str, int]:
"""Sends the initial events into a new room.
"""Sends the initial events into a new room. Sends the room creation, membership,
and power level events into the room sequentially, then creates and batches up the
rest of the events to persist as a batch to the DB.
`power_level_content_override` doesn't apply when initial state has
power level state event content.
Expand All @@ -1053,11 +1055,19 @@ async def _send_events_for_new_room(
"""

creator_id = creator.user.to_string()

event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""}

depth = 1
# the last event sent/persisted to the db
last_sent_event_id: Optional[str] = None
# the most recently created event
prev_event: List[str] = []
# a map of event types, state keys -> event_ids. We collect these mappings this as events are
# created (but not persisted to the db) to determine state for future created events
# (as this info can't be pulled from the db)
state_map: dict = {}
# current_state_group of last event created. Used for computing event context of
# events to be batched
current_state_group = None

def create_event_dict(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
e = {"type": etype, "content": content}
Expand All @@ -1068,32 +1078,37 @@ def create_event_dict(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
return e

async def create_event(
etype: str, content: JsonDict, **kwargs: Any
etype: str,
content: JsonDict,
for_batch: bool,
**kwargs: Any,
) -> Tuple[EventBase, synapse.events.snapshot.EventContext]:
nonlocal last_sent_event_id
nonlocal depth
nonlocal prev_event

event_dict = create_event_dict(etype, content, **kwargs)

event, context = await self.event_creation_handler.create_event(
new_event, new_context = await self.event_creation_handler.create_event(
creator,
event_dict,
prev_event_ids=[last_sent_event_id] if last_sent_event_id else [],
prev_event_ids=prev_event,
depth=depth,
state_map=state_map,
for_batch=for_batch,
current_state_group=current_state_group,
)
depth += 1
prev_event = [new_event.event_id]
state_map[(new_event.type, new_event.state_key)] = new_event.event_id

return event, context
return new_event, new_context

async def send(
event: EventBase,
context: synapse.events.snapshot.EventContext,
creator: Requester,
) -> int:
nonlocal last_sent_event_id
nonlocal depth
assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
event.sender,
)

ev = await self.event_creation_handler.handle_new_client_event(
requester=creator,
Expand All @@ -1104,7 +1119,6 @@ async def send(
)

last_sent_event_id = ev.event_id
depth += 1

# we know it was persisted, so must have a stream ordering
assert ev.internal_metadata.stream_ordering
Expand All @@ -1119,7 +1133,7 @@ async def send(

creation_content.update({"creator": creator_id})
creation_event, creation_context = await create_event(
etype=EventTypes.Create, content=creation_content
EventTypes.Create, creation_content, False
)

logger.debug("Sending %s in new room", EventTypes.Member)
Expand All @@ -1139,14 +1153,21 @@ async def send(
depth=depth,
)
last_sent_event_id = member_event_id
prev_event = [member_event_id]

# update the depth and state map here as the membership event has been created
# through a different code path
depth += 1
state_map[(EventTypes.Member, creator.user.to_string())] = member_event_id

# We treat the power levels override specially as this needs to be one
# of the first events that get sent into a room.
pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None)
if pl_content is not None:
power_event, power_context = await create_event(
etype=EventTypes.PowerLevels, content=pl_content
EventTypes.PowerLevels, pl_content, False
)
current_state_group = power_context._state_group
last_sent_stream_id = await send(power_event, power_context, creator)
else:
power_level_content: JsonDict = {
Expand Down Expand Up @@ -1190,67 +1211,68 @@ async def send(
# apply those.
if power_level_content_override:
power_level_content.update(power_level_content_override)

pl_event, pl_context = await create_event(
etype=EventTypes.PowerLevels, content=power_level_content
EventTypes.PowerLevels,
power_level_content,
False,
)
current_state_group = pl_context._state_group
last_sent_stream_id = await send(pl_event, pl_context, creator)

events_to_send = []
if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
room_alias_event, room_alias_context = await create_event(
etype=EventTypes.CanonicalAlias,
content={"alias": room_alias.to_string()},
)
last_sent_stream_id = await send(
room_alias_event, room_alias_context, creator
EventTypes.CanonicalAlias, {"alias": room_alias.to_string()}, True
)
current_state_group = room_alias_context._state_group
events_to_send.append((room_alias_event, room_alias_context))

if (EventTypes.JoinRules, "") not in initial_state:
join_rules_event, join_rules_context = await create_event(
etype=EventTypes.JoinRules, content={"join_rule": config["join_rules"]}
)
last_sent_stream_id = await send(
join_rules_event, join_rules_context, creator
EventTypes.JoinRules,
{"join_rule": config["join_rules"]},
True,
)
current_state_group = join_rules_context._state_group
events_to_send.append((join_rules_event, join_rules_context))

if (EventTypes.RoomHistoryVisibility, "") not in initial_state:
visibility_event, visibility_context = await create_event(
etype=EventTypes.RoomHistoryVisibility,
content={"history_visibility": config["history_visibility"]},
)
last_sent_stream_id = await send(
visibility_event, visibility_context, creator
EventTypes.RoomHistoryVisibility,
{"history_visibility": config["history_visibility"]},
True,
)
current_state_group = visibility_context._state_group
events_to_send.append((visibility_event, visibility_context))

if config["guest_can_join"]:
if (EventTypes.GuestAccess, "") not in initial_state:
guest_access_event, guest_access_context = await create_event(
etype=EventTypes.GuestAccess,
content={EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
)
last_sent_stream_id = await send(
guest_access_event, guest_access_context, creator
EventTypes.GuestAccess,
{EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
True,
)
current_state_group = guest_access_context._state_group
events_to_send.append((guest_access_event, guest_access_context))

events = []
for (etype, state_key), content in initial_state.items():
event, context = await create_event(
etype=etype, state_key=state_key, content=content
etype, content, True, state_key=state_key
)
events.append((event, context))
for event, context in events:
last_sent_stream_id = await send(event, context, creator)
current_state_group = context._state_group
events_to_send.append((event, context))

if config["encrypted"]:
encryption_event, encryption_context = await create_event(
etype=EventTypes.RoomEncryption,
EventTypes.RoomEncryption,
{"algorithm": RoomEncryptionAlgorithms.DEFAULT},
True,
state_key="",
content={"algorithm": RoomEncryptionAlgorithms.DEFAULT},
)
last_sent_stream_id = await send(
encryption_event, encryption_context, creator
)
events_to_send.append((encryption_event, encryption_context))

for event, context in events_to_send:
last_sent_stream_id = await send(event, context, creator)
return last_sent_stream_id, last_sent_event_id, depth

def _generate_room_id(self) -> str:
Expand Down

0 comments on commit 3ece6b4

Please sign in to comment.