@@ -637,6 +637,7 @@ def _insert_linearized_receipt_txn(
637
637
receipt_type : str ,
638
638
user_id : str ,
639
639
event_id : str ,
640
+ thread_id : Optional [str ],
640
641
data : JsonDict ,
641
642
stream_id : int ,
642
643
) -> Optional [int ]:
@@ -663,12 +664,27 @@ def _insert_linearized_receipt_txn(
663
664
# We don't want to clobber receipts for more recent events, so we
664
665
# have to compare orderings of existing receipts
665
666
if stream_ordering is not None :
666
- sql = (
667
- "SELECT stream_ordering, event_id FROM events"
668
- " INNER JOIN receipts_linearized AS r USING (event_id, room_id)"
669
- " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
667
+ if thread_id is None :
668
+ thread_clause = "r.thread_id IS NULL"
669
+ thread_args : Tuple [str , ...] = ()
670
+ else :
671
+ thread_clause = "r.thread_id = ?"
672
+ thread_args = (thread_id ,)
673
+
674
+ sql = f"""
675
+ SELECT stream_ordering, event_id FROM events
676
+ INNER JOIN receipts_linearized AS r USING (event_id, room_id)
677
+ WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND { thread_clause }
678
+ """
679
+ txn .execute (
680
+ sql ,
681
+ (
682
+ room_id ,
683
+ receipt_type ,
684
+ user_id ,
685
+ )
686
+ + thread_args ,
670
687
)
671
- txn .execute (sql , (room_id , receipt_type , user_id ))
672
688
673
689
for so , eid in txn :
674
690
if int (so ) >= stream_ordering :
@@ -688,21 +704,28 @@ def _insert_linearized_receipt_txn(
688
704
self ._receipts_stream_cache .entity_has_changed , room_id , stream_id
689
705
)
690
706
707
+ keyvalues = {
708
+ "room_id" : room_id ,
709
+ "receipt_type" : receipt_type ,
710
+ "user_id" : user_id ,
711
+ }
712
+ where_clause = ""
713
+ if thread_id is None :
714
+ where_clause = "thread_id IS NULL"
715
+ else :
716
+ keyvalues ["thread_id" ] = thread_id
717
+
691
718
self .db_pool .simple_upsert_txn (
692
719
txn ,
693
720
table = "receipts_linearized" ,
694
- keyvalues = {
695
- "room_id" : room_id ,
696
- "receipt_type" : receipt_type ,
697
- "user_id" : user_id ,
698
- },
721
+ keyvalues = keyvalues ,
699
722
values = {
700
723
"stream_id" : stream_id ,
701
724
"event_id" : event_id ,
702
725
"event_stream_ordering" : stream_ordering ,
703
726
"data" : json_encoder .encode (data ),
704
- "thread_id" : None ,
705
727
},
728
+ where_clause = where_clause ,
706
729
# receipts_linearized has a unique constraint on
707
730
# (user_id, room_id, receipt_type), so no need to lock
708
731
lock = False ,
@@ -754,6 +777,7 @@ async def insert_receipt(
754
777
receipt_type : str ,
755
778
user_id : str ,
756
779
event_ids : List [str ],
780
+ thread_id : Optional [str ],
757
781
data : dict ,
758
782
) -> Optional [Tuple [int , int ]]:
759
783
"""Insert a receipt, either from local client or remote server.
@@ -786,6 +810,7 @@ async def insert_receipt(
786
810
receipt_type ,
787
811
user_id ,
788
812
linearized_event_id ,
813
+ thread_id ,
789
814
data ,
790
815
stream_id = stream_id ,
791
816
# Read committed is actually beneficial here because we check for a receipt with
@@ -800,7 +825,8 @@ async def insert_receipt(
800
825
801
826
now = self ._clock .time_msec ()
802
827
logger .debug (
803
- "RR for event %s in %s (%i ms old)" ,
828
+ "Receipt %s for event %s in %s (%i ms old)" ,
829
+ receipt_type ,
804
830
linearized_event_id ,
805
831
room_id ,
806
832
now - event_ts ,
@@ -813,6 +839,7 @@ async def insert_receipt(
813
839
receipt_type ,
814
840
user_id ,
815
841
event_ids ,
842
+ thread_id ,
816
843
data ,
817
844
)
818
845
@@ -827,6 +854,7 @@ def _insert_graph_receipt_txn(
827
854
receipt_type : str ,
828
855
user_id : str ,
829
856
event_ids : List [str ],
857
+ thread_id : Optional [str ],
830
858
data : JsonDict ,
831
859
) -> None :
832
860
assert self ._can_write_to_receipts
@@ -838,19 +866,26 @@ def _insert_graph_receipt_txn(
838
866
# FIXME: This shouldn't invalidate the whole cache
839
867
txn .call_after (self ._get_linearized_receipts_for_room .invalidate , (room_id ,))
840
868
869
+ keyvalues = {
870
+ "room_id" : room_id ,
871
+ "receipt_type" : receipt_type ,
872
+ "user_id" : user_id ,
873
+ }
874
+ where_clause = ""
875
+ if thread_id is None :
876
+ where_clause = "thread_id IS NULL"
877
+ else :
878
+ keyvalues ["thread_id" ] = thread_id
879
+
841
880
self .db_pool .simple_upsert_txn (
842
881
txn ,
843
882
table = "receipts_graph" ,
844
- keyvalues = {
845
- "room_id" : room_id ,
846
- "receipt_type" : receipt_type ,
847
- "user_id" : user_id ,
848
- },
883
+ keyvalues = keyvalues ,
849
884
values = {
850
885
"event_ids" : json_encoder .encode (event_ids ),
851
886
"data" : json_encoder .encode (data ),
852
- "thread_id" : None ,
853
887
},
888
+ where_clause = where_clause ,
854
889
# receipts_graph has a unique constraint on
855
890
# (user_id, room_id, receipt_type), so no need to lock
856
891
lock = False ,
0 commit comments