12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
14
15
- from typing import TYPE_CHECKING , Any , Dict , Iterable , List , Tuple , cast
15
+ from typing import TYPE_CHECKING , Any , Dict , Iterable , List , Optional , Tuple , cast
16
16
17
17
from synapse .api .presence import PresenceState , UserPresenceState
18
18
from synapse .replication .tcp .streams import PresenceStream
22
22
LoggingDatabaseConnection ,
23
23
LoggingTransaction ,
24
24
)
25
+ from synapse .storage .databases .main .cache import CacheInvalidationWorkerStore
25
26
from synapse .storage .engines import PostgresEngine
26
27
from synapse .storage .types import Connection
27
28
from synapse .storage .util .id_generators import (
@@ -56,7 +57,7 @@ def __init__(
56
57
)
57
58
58
59
59
- class PresenceStore (PresenceBackgroundUpdateStore ):
60
+ class PresenceStore (PresenceBackgroundUpdateStore , CacheInvalidationWorkerStore ):
60
61
def __init__ (
61
62
self ,
62
63
database : DatabasePool ,
@@ -281,20 +282,30 @@ async def should_user_receive_full_presence_with_token(
281
282
True if the user should have full presence sent to them, False otherwise.
282
283
"""
283
284
284
- def _should_user_receive_full_presence_with_token_txn (
285
- txn : LoggingTransaction ,
286
- ) -> bool :
287
- sql = """
288
- SELECT 1 FROM users_to_send_full_presence_to
289
- WHERE user_id = ?
290
- AND presence_stream_id >= ?
291
- """
292
- txn .execute (sql , (user_id , from_token ))
293
- return bool (txn .fetchone ())
285
+ token = await self ._get_full_presence_stream_token_for_user (user_id )
286
+ if token is None :
287
+ return False
294
288
295
- return await self .db_pool .runInteraction (
296
- "should_user_receive_full_presence_with_token" ,
297
- _should_user_receive_full_presence_with_token_txn ,
289
+ return from_token <= token
290
+
291
+ @cached ()
292
+ async def _get_full_presence_stream_token_for_user (
293
+ self , user_id : str
294
+ ) -> Optional [int ]:
295
+ """Get the presence token corresponding to the last full presence update
296
+ for this user.
297
+
298
+ If the user presents a sync token with a presence stream token at least
299
+ as old as the result, then we need to send them a full presence update.
300
+
301
+ If this user has never needed a full presence update, returns `None`.
302
+ """
303
+ return await self .db_pool .simple_select_one_onecol (
304
+ table = "users_to_send_full_presence_to" ,
305
+ keyvalues = {"user_id" : user_id },
306
+ retcol = "presence_stream_id" ,
307
+ allow_none = True ,
308
+ desc = "_get_full_presence_stream_token_for_user" ,
298
309
)
299
310
300
311
async def add_users_to_send_full_presence_to (self , user_ids : Iterable [str ]) -> None :
@@ -307,18 +318,28 @@ async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]) -> N
307
318
# Add user entries to the table, updating the presence_stream_id column if the user already
308
319
# exists in the table.
309
320
presence_stream_id = self ._presence_id_gen .get_current_token ()
310
- await self .db_pool .simple_upsert_many (
311
- table = "users_to_send_full_presence_to" ,
312
- key_names = ("user_id" ,),
313
- key_values = [(user_id ,) for user_id in user_ids ],
314
- value_names = ("presence_stream_id" ,),
315
- # We save the current presence stream ID token along with the user ID entry so
316
- # that when a user /sync's, even if they syncing multiple times across separate
317
- # devices at different times, each device will receive full presence once - when
318
- # the presence stream ID in their sync token is less than the one in the table
319
- # for their user ID.
320
- value_values = [(presence_stream_id ,) for _ in user_ids ],
321
- desc = "add_users_to_send_full_presence_to" ,
321
+
322
+ def _add_users_to_send_full_presence_to (txn : LoggingTransaction ) -> None :
323
+ self .db_pool .simple_upsert_many_txn (
324
+ txn ,
325
+ table = "users_to_send_full_presence_to" ,
326
+ key_names = ("user_id" ,),
327
+ key_values = [(user_id ,) for user_id in user_ids ],
328
+ value_names = ("presence_stream_id" ,),
329
+ # We save the current presence stream ID token along with the user ID entry so
330
+ # that when a user /sync's, even if they syncing multiple times across separate
331
+ # devices at different times, each device will receive full presence once - when
332
+ # the presence stream ID in their sync token is less than the one in the table
333
+ # for their user ID.
334
+ value_values = [(presence_stream_id ,) for _ in user_ids ],
335
+ )
336
+ for user_id in user_ids :
337
+ self ._invalidate_cache_and_stream (
338
+ txn , self ._get_full_presence_stream_token_for_user , (user_id ,)
339
+ )
340
+
341
+ return await self .db_pool .runInteraction (
342
+ "add_users_to_send_full_presence_to" , _add_users_to_send_full_presence_to
322
343
)
323
344
324
345
async def get_presence_for_all_users (
0 commit comments