Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 3996cde

Browse files
committed
Send the thread ID over replication.
1 parent 16a9d70 commit 3996cde

File tree

3 files changed

+7
-6
lines changed

3 files changed

+7
-6
lines changed

synapse/replication/tcp/client.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ async def _on_new_receipts(
423423
receipt.receipt_type,
424424
receipt.user_id,
425425
[receipt.event_id],
426-
thread_id=None, # TODO
426+
thread_id=receipt.thread_id,
427427
data=receipt.data,
428428
)
429429
await self.federation_sender.send_read_receipt(receipt_info)

synapse/replication/tcp/streams/_base.py

+1
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ class ReceiptsStreamRow:
361361
receipt_type: str
362362
user_id: str
363363
event_id: str
364+
thread_id: Optional[str]
364365
data: dict
365366

366367
NAME = "receipts"

synapse/storage/databases/main/receipts.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,7 @@ def _get_users_sent_receipts_between_txn(txn: LoggingTransaction) -> List[str]:
526526

527527
async def get_all_updated_receipts(
528528
self, instance_name: str, last_id: int, current_id: int, limit: int
529-
) -> Tuple[List[Tuple[int, list]], int, bool]:
529+
) -> Tuple[List[Tuple[int, Tuple[str, str, str, str, str, JsonDict]]], int, bool]:
530530
"""Get updates for receipts replication stream.
531531
532532
Args:
@@ -553,9 +553,9 @@ async def get_all_updated_receipts(
553553

554554
def get_all_updated_receipts_txn(
555555
txn: LoggingTransaction,
556-
) -> Tuple[List[Tuple[int, list]], int, bool]:
556+
) -> Tuple[List[Tuple[int, Tuple[str, str, str, str, str, JsonDict]]], int, bool]:
557557
sql = """
558-
SELECT stream_id, room_id, receipt_type, user_id, event_id, data
558+
SELECT stream_id, room_id, receipt_type, user_id, event_id, thread_id, data
559559
FROM receipts_linearized
560560
WHERE ? < stream_id AND stream_id <= ?
561561
ORDER BY stream_id ASC
@@ -564,8 +564,8 @@ def get_all_updated_receipts_txn(
564564
txn.execute(sql, (last_id, current_id, limit))
565565

566566
updates = cast(
567-
List[Tuple[int, list]],
568-
[(r[0], r[1:5] + (db_to_json(r[5]),)) for r in txn],
567+
List[Tuple[int, Tuple[str, str, str, str, str, JsonDict]]],
568+
[(r[0], r[1:6] + (db_to_json(r[6]),)) for r in txn],
569569
)
570570

571571
limited = False

0 commit comments

Comments
 (0)