Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix bug in account data replication stream. #7656

Merged
merged 4 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/7656.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug in account data replication stream.
8 changes: 7 additions & 1 deletion synapse/replication/slave/storage/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@
class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
def __init__(self, database: Database, db_conn, hs):
self._account_data_id_gen = SlavedIdTracker(
db_conn, "account_data_max_stream_id", "stream_id"
db_conn,
"account_data",
"stream_id",
extra_tables=[
("room_account_data", "stream_id"),
("room_tags_revisions", "stream_id"),
],
)

super(SlavedAccountDataStore, self).__init__(database, db_conn, hs)
Expand Down
10 changes: 8 additions & 2 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,14 @@ async def _update_function(
for stream_id, user_id, room_id, account_data_type in room_results
)

# we need to return a sorted list, so merge them together.
updates = list(heapq.merge(room_rows, global_rows))
# We need to return a sorted list, so merge them together.
#
# Note: We order only by the stream ID to work around a bug where the
# same stream ID could appear in both `global_rows` and `room_rows`,
# leading to a comparison between the data tuples. The comparison could
# fail due to attempting to compare the `room_id` which results in a
# `TypeError` from comparing a `str` vs `None`.
updates = list(heapq.merge(room_rows, global_rows, key=lambda row: row[0]))
return updates, to_token, limited


Expand Down
41 changes: 7 additions & 34 deletions synapse/storage/data_stores/main/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,13 @@ def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):
class AccountDataStore(AccountDataWorkerStore):
def __init__(self, database: Database, db_conn, hs):
self._account_data_id_gen = StreamIdGenerator(
db_conn, "account_data_max_stream_id", "stream_id"
db_conn,
"account_data_max_stream_id",
"stream_id",
extra_tables=[
("room_account_data", "stream_id"),
("room_tags_revisions", "stream_id"),
],
)

super(AccountDataStore, self).__init__(database, db_conn, hs)
Expand Down Expand Up @@ -339,14 +345,6 @@ def add_account_data_to_room(self, user_id, room_id, account_data_type, content)
lock=False,
)

# it's theoretically possible for the above to succeed and the
# below to fail - in which case we might reuse a stream id on
# restart, and the above update might not get propagated. That
# doesn't sound any worse than the whole update getting lost,
# which is what would happen if we combined the two into one
# transaction.
yield self._update_max_stream_id(next_id)

self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
self.get_account_data_for_room.invalidate((user_id, room_id))
Expand Down Expand Up @@ -381,14 +379,6 @@ def add_account_data_for_user(self, user_id, account_data_type, content):
lock=False,
)

# it's theoretically possible for the above to succeed and the
# below to fail - in which case we might reuse a stream id on
# restart, and the above update might not get propagated. That
# doesn't sound any worse than the whole update getting lost,
# which is what would happen if we combined the two into one
# transaction.
yield self._update_max_stream_id(next_id)

self._account_data_stream_cache.entity_has_changed(user_id, next_id)
self.get_account_data_for_user.invalidate((user_id,))
self.get_global_account_data_by_type_for_user.invalidate(
Expand All @@ -397,20 +387,3 @@ def add_account_data_for_user(self, user_id, account_data_type, content):

result = self._account_data_id_gen.get_current_token()
return result

def _update_max_stream_id(self, next_id):
"""Update the max stream_id

Args:
next_id(int): The the revision to advance to.
"""

def _update(txn):
update_max_id_sql = (
"UPDATE account_data_max_stream_id"
" SET stream_id = ?"
" WHERE stream_id < ?"
)
txn.execute(update_max_id_sql, (next_id, next_id))

return self.db.runInteraction("update_account_data_max_stream_id", _update)
7 changes: 0 additions & 7 deletions synapse/storage/data_stores/main/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,6 @@ def _update_revision_txn(self, txn, user_id, room_id, next_id):
self._account_data_stream_cache.entity_has_changed, user_id, next_id
)

update_max_id_sql = (
"UPDATE account_data_max_stream_id"
" SET stream_id = ?"
" WHERE stream_id < ?"
)
txn.execute(update_max_id_sql, (next_id, next_id))

update_sql = (
"UPDATE room_tags_revisions"
" SET stream_id = ?"
Expand Down