-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Refactor _resolve_state_at_missing_prevs
to return an EventContext
#13404
Changes from 1 commit
bd13f9c
c1474ec
5e62cd0
2e4b3f4
d513768
d13c8ab
da6270b
11e7af1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -278,19 +278,15 @@ async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None: | |
) | ||
|
||
try: | ||
await self._process_received_pdu( | ||
origin, pdu, state_ids=None, partial_state=None | ||
) | ||
await self._process_received_pdu(origin, pdu, context=None) | ||
except PartialStateConflictError: | ||
# The room was un-partial stated while we were processing the PDU. | ||
# Try once more, with full state this time. | ||
logger.info( | ||
"Room %s was un-partial stated while processing the PDU, trying again.", | ||
room_id, | ||
) | ||
await self._process_received_pdu( | ||
origin, pdu, state_ids=None, partial_state=None | ||
) | ||
await self._process_received_pdu(origin, pdu, context=None) | ||
|
||
async def on_send_membership_event( | ||
self, origin: str, event: EventBase | ||
|
@@ -539,36 +535,10 @@ async def update_state_for_partial_state_event( | |
# | ||
# This is the same operation as we do when we receive a regular event | ||
# over federation. | ||
state_ids, partial_state = await self._resolve_state_at_missing_prevs( | ||
context = await self._compute_event_context_with_missing_prevs( | ||
destination, event | ||
) | ||
|
||
# There are three possible cases for (state_ids, partial_state): | ||
# * `state_ids` and `partial_state` are both `None` if we had all the | ||
# prev_events. The prev_events may or may not have partial state and | ||
# we won't know until we compute the event context. | ||
# * `state_ids` is not `None` and `partial_state` is `False` if we were | ||
# missing some prev_events (but we have full state for any we did | ||
# have). We calculated the full state after the prev_events. | ||
# * `state_ids` is not `None` and `partial_state` is `True` if we were | ||
# missing some, but not all, prev_events. At least one of the | ||
# prev_events we did have had partial state, so we calculated a partial | ||
# state after the prev_events. | ||
|
||
context = None | ||
if state_ids is not None and partial_state: | ||
# the state after the prev events is still partial. We can't de-partial | ||
# state the event, so don't bother building the event context. | ||
pass | ||
else: | ||
# build a new state group for it if need be | ||
context = await self._state_handler.compute_event_context( | ||
event, | ||
state_ids_before_event=state_ids, | ||
partial_state=partial_state, | ||
) | ||
|
||
if context is None or context.partial_state: | ||
if context.partial_state: | ||
# this can happen if some or all of the event's prev_events still have | ||
# partial state. We were careful to only pick events from the db without | ||
# partial-state prev events, so that implies that a prev event has | ||
|
@@ -834,26 +804,25 @@ async def _process_pulled_event( | |
|
||
try: | ||
try: | ||
state_ids, partial_state = await self._resolve_state_at_missing_prevs( | ||
context = await self._compute_event_context_with_missing_prevs( | ||
origin, event | ||
) | ||
await self._process_received_pdu( | ||
origin, | ||
event, | ||
state_ids=state_ids, | ||
partial_state=partial_state, | ||
context=context, | ||
backfilled=backfilled, | ||
) | ||
except PartialStateConflictError: | ||
# The room was un-partial stated while we were processing the event. | ||
# Try once more, with full state this time. | ||
state_ids, partial_state = await self._resolve_state_at_missing_prevs( | ||
context = await self._compute_event_context_with_missing_prevs( | ||
origin, event | ||
) | ||
|
||
# We ought to have full state now, barring some unlikely race where we left and | ||
# rejoned the room in the background. | ||
if state_ids is not None and partial_state: | ||
if context.partial_state: | ||
raise AssertionError( | ||
f"Event {event.event_id} still has a partial resolved state " | ||
f"after room {event.room_id} was un-partial stated" | ||
|
@@ -862,8 +831,7 @@ async def _process_pulled_event( | |
await self._process_received_pdu( | ||
origin, | ||
event, | ||
state_ids=state_ids, | ||
partial_state=partial_state, | ||
context=context, | ||
backfilled=backfilled, | ||
) | ||
except FederationError as e: | ||
|
@@ -872,15 +840,18 @@ async def _process_pulled_event( | |
else: | ||
raise | ||
|
||
async def _resolve_state_at_missing_prevs( | ||
async def _compute_event_context_with_missing_prevs( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. given we don't know if we have missing prevs, this is a bit of a confusing name (notwithstanding the old name being just as confusing). Unfortunately I can't think of a better name than |
||
self, dest: str, event: EventBase | ||
) -> Tuple[Optional[StateMap[str]], Optional[bool]]: | ||
"""Calculate the state at an event with missing prev_events. | ||
) -> EventContext: | ||
"""Build an EventContext structure for a non-outlier event whose prev_events may | ||
be missing. | ||
|
||
This is used when we have pulled a batch of events from a remote server, and | ||
still don't have all the prev_events. | ||
This is used when we have pulled a batch of events from a remote server, and may | ||
not have all the prev_events. | ||
|
||
If we already have all the prev_events for `event`, this method does nothing. | ||
To build an EventContext, we need to calculate the state at the event. If we | ||
already have all the prev_events for `event`, we can simply use the state at the | ||
prev_events to calculate the state at `event`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/state at/state before/ |
||
|
||
Otherwise, the missing prevs become new backwards extremities, and we fall back | ||
to asking the remote server for the state after each missing `prev_event`, | ||
|
@@ -901,10 +872,7 @@ async def _resolve_state_at_missing_prevs( | |
event: an event to check for missing prevs. | ||
|
||
Returns: | ||
if we already had all the prev events, `None, None`. Otherwise, returns a | ||
tuple containing: | ||
* the event ids of the state at `event`. | ||
* a boolean indicating whether the state may be partial. | ||
The event context. | ||
|
||
Raises: | ||
FederationError if we fail to get the state from the remote server after any | ||
|
@@ -918,7 +886,7 @@ async def _resolve_state_at_missing_prevs( | |
missing_prevs = prevs - seen | ||
|
||
if not missing_prevs: | ||
return None, None | ||
return await self._state_handler.compute_event_context(event) | ||
|
||
logger.info( | ||
"Event %s is missing prev_events %s: calculating state for a " | ||
|
@@ -984,7 +952,9 @@ async def _resolve_state_at_missing_prevs( | |
"We can't get valid state history.", | ||
affected=event_id, | ||
) | ||
return state_map, partial_state | ||
return await self._state_handler.compute_event_context( | ||
event, state_ids_before_event=state_map, partial_state=partial_state | ||
) | ||
|
||
async def _get_state_ids_after_missing_prev_event( | ||
self, | ||
|
@@ -1153,8 +1123,7 @@ async def _process_received_pdu( | |
self, | ||
origin: str, | ||
event: EventBase, | ||
state_ids: Optional[StateMap[str]], | ||
partial_state: Optional[bool], | ||
context: Optional[EventContext], | ||
backfilled: bool = False, | ||
) -> None: | ||
"""Called when we have a new non-outlier event. | ||
|
@@ -1176,32 +1145,22 @@ async def _process_received_pdu( | |
|
||
event: event to be persisted | ||
|
||
state_ids: Normally None, but if we are handling a gap in the graph | ||
(ie, we are missing one or more prev_events), the resolved state at the | ||
event | ||
|
||
partial_state: | ||
`True` if `state_ids` is partial and omits non-critical membership | ||
events. | ||
`False` if `state_ids` is the full state. | ||
`None` if `state_ids` is not provided. In this case, the flag will be | ||
calculated based on `event`'s prev events. | ||
context: The `EventContext` to persist the event with. | ||
|
||
backfilled: True if this is part of a historical batch of events (inhibits | ||
notification to clients, and validation of device keys.) | ||
|
||
PartialStateConflictError: if the room was un-partial stated in between | ||
computing the state at the event and persisting it. The caller should retry | ||
exactly once in this case. | ||
exactly once in this case. If a `context` was provided, it should be | ||
recomputed. | ||
""" | ||
logger.debug("Processing event: %s", event) | ||
assert not event.internal_metadata.outlier | ||
|
||
context = await self._state_handler.compute_event_context( | ||
event, | ||
state_ids_before_event=state_ids, | ||
partial_state=partial_state, | ||
) | ||
if context is None: | ||
context = await self._state_handler.compute_event_context(event) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like it would be cleaner to make |
||
|
||
try: | ||
await self._check_event_auth(origin, event, context) | ||
except AuthError as e: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We lose the optimization here in exchange for simpler code.