Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prep work for removing outlier from internal_metadata #9411

Merged
merged 5 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9411.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Preparatory steps for removing redundant `outlier` data from `event_json.internal_metadata` column.
9 changes: 6 additions & 3 deletions synapse/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def __get__(self, instance, owner=None):


class _EventInternalMetadata:
__slots__ = ["_dict", "stream_ordering"]
__slots__ = ["_dict", "stream_ordering", "outlier"]

def __init__(self, internal_metadata_dict: JsonDict):
# we have to copy the dict, because it turns out that the same dict is
Expand All @@ -108,7 +108,10 @@ def __init__(self, internal_metadata_dict: JsonDict):
# the stream ordering of this event. None, until it has been persisted.
self.stream_ordering = None # type: Optional[int]

outlier = DictProperty("outlier") # type: bool
# whether this event is an outlier (ie, whether we have the state at that point
# in the DAG)
self.outlier = False

out_of_band_membership = DictProperty("out_of_band_membership") # type: bool
send_on_behalf_of = DictProperty("send_on_behalf_of") # type: str
recheck_redaction = DictProperty("recheck_redaction") # type: bool
Expand All @@ -129,7 +132,7 @@ def get_dict(self) -> JsonDict:
return dict(self._dict)

def is_outlier(self) -> bool:
return self._dict.get("outlier", False)
return self.outlier

def is_out_of_band_membership(self) -> bool:
"""Whether this is an out of band membership, like an invite or an invite
Expand Down
2 changes: 2 additions & 0 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def prune_event(event: EventBase) -> EventBase:
event.internal_metadata.stream_ordering
)

pruned_event.internal_metadata.outlier = event.internal_metadata.outlier

# Mark the event as redacted
pruned_event.internal_metadata.redacted = True

Expand Down
3 changes: 3 additions & 0 deletions synapse/replication/http/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
// containing the event
"event_format_version": .., // 1,2,3 etc: the event format version
"internal_metadata": { .. serialized internal_metadata .. },
"outlier": true|false,
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
}],
Expand Down Expand Up @@ -84,6 +85,7 @@ async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
"room_version": event.room_version.identifier,
"event_format_version": event.format_version,
"internal_metadata": event.internal_metadata.get_dict(),
"outlier": event.internal_metadata.is_outlier(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
}
Expand Down Expand Up @@ -116,6 +118,7 @@ async def _handle_request(self, request):
event = make_event_from_dict(
event_dict, room_ver, internal_metadata, rejected_reason
)
event.internal_metadata.outlier = event_payload["outlier"]

context = EventContext.deserialize(
self.storage, event_payload["context"]
Expand Down
4 changes: 3 additions & 1 deletion synapse/replication/http/send_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
// containing the event
"event_format_version": .., // 1,2,3 etc: the event format version
"internal_metadata": { .. serialized internal_metadata .. },
"outlier": true|false,
"rejected_reason": .., // The event.rejected_reason field
"context": { .. serialized event context .. },
"requester": { .. serialized requester .. },
Expand Down Expand Up @@ -79,14 +80,14 @@ async def _serialize_payload(
ratelimit (bool)
extra_users (list(UserID)): Any extra users to notify about event
"""

serialized_context = await context.serialize(event, store)

payload = {
"event": event.get_pdu_json(),
"room_version": event.room_version.identifier,
"event_format_version": event.format_version,
"internal_metadata": event.internal_metadata.get_dict(),
"outlier": event.internal_metadata.is_outlier(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
"requester": requester.serialize(),
Expand All @@ -108,6 +109,7 @@ async def _handle_request(self, request, event_id):
event = make_event_from_dict(
event_dict, room_ver, internal_metadata, rejected_reason
)
event.internal_metadata.outlier = content["outlier"]

requester = Requester.deserialize(self.store, content["requester"])
context = EventContext.deserialize(self.storage, content["context"])
Expand Down
19 changes: 17 additions & 2 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1270,8 +1270,10 @@ def _update_outliers_txn(self, txn, events_and_contexts):
logger.exception("")
raise

# update the stored internal_metadata to update the "outlier" flag.
# TODO: This is unused as of Synapse 1.31. Remove it once we are happy
# to drop backwards-compatibility with 1.30.
metadata_json = json_encoder.encode(event.internal_metadata.get_dict())

sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
txn.execute(sql, (metadata_json, event.event_id))

Expand Down Expand Up @@ -1319,6 +1321,19 @@ def event_dict(event):
d.pop("redacted_because", None)
return d

def get_internal_metadata(event):
im = event.internal_metadata.get_dict()

# temporary hack for database compatibility with Synapse 1.30 and earlier:
# store the `outlier` flag inside the internal_metadata json as well as in
# the `events` table, so that if anyone rolls back to an older Synapse,
# things keep working. This can be removed once we are happy to drop support
# for that
if event.internal_metadata.is_outlier():
im["outlier"] = True

return im

self.db_pool.simple_insert_many_txn(
txn,
table="event_json",
Expand All @@ -1327,7 +1342,7 @@ def event_dict(event):
"event_id": event.event_id,
"room_id": event.room_id,
"internal_metadata": json_encoder.encode(
event.internal_metadata.get_dict()
get_internal_metadata(event)
),
"json": json_encoder.encode(event_dict(event)),
"format_version": event.format_version,
Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ async def _get_events_from_db(self, event_ids, allow_rejected=False):
rejected_reason=rejected_reason,
)
original_ev.internal_metadata.stream_ordering = row["stream_ordering"]
original_ev.internal_metadata.outlier = row["outlier"]

event_map[event_id] = original_ev

Expand Down Expand Up @@ -905,7 +906,8 @@ def _fetch_event_rows(self, txn, event_ids):
ej.json,
ej.format_version,
r.room_version,
rej.reason
rej.reason,
e.outlier
FROM events AS e
JOIN event_json AS ej USING (event_id)
LEFT JOIN rooms r ON r.room_id = e.room_id
Expand All @@ -929,6 +931,7 @@ def _fetch_event_rows(self, txn, event_ids):
"room_version_id": row[5],
"rejected_reason": row[6],
"redactions": [],
"outlier": row[7],
}

# check for redactions
Expand Down