Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add local_current_membership table #6655

Merged
merged 11 commits into from
Jan 15, 2020
1 change: 1 addition & 0 deletions changelog.d/6655.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `local_current_membership` table for tracking local user membership state in rooms.
2 changes: 1 addition & 1 deletion scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ class Porter(object):
engine.check_database(
db_conn, allow_outdated_version=allow_outdated_version
)
prepare_database(db_conn, engine, config=None)
prepare_database(db_conn, engine, config=self.hs_config)
store = Store(Database(hs, db_config, engine), db_conn, hs)
db_conn.commit()

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ async def export_user_data(self, user_id, writer):
The returned value is that returned by `writer.finished()`.
"""
# Get all rooms the user is in or has been in
rooms = await self.store.get_rooms_for_user_where_membership_is(
rooms = await self.store.get_rooms_for_local_user_where_membership_is(
user_id,
membership_list=(
Membership.JOIN,
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/deactivate_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async def _reject_pending_invites_for_user(self, user_id):
user_id (str): The user ID to reject pending invites for.
"""
user = UserID.from_string(user_id)
pending_invites = await self.store.get_invited_rooms_for_user(user_id)
pending_invites = await self.store.get_invited_rooms_for_local_user(user_id)

for room in pending_invites:
try:
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async def _snapshot_all_rooms(
if include_archived:
memberships.append(Membership.LEAVE)

room_list = await self.store.get_rooms_for_user_where_membership_is(
room_list = await self.store.get_rooms_for_local_user_where_membership_is(
user_id=user_id, membership_list=memberships
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ def lookup_room_alias(self, room_alias):

@defer.inlineCallbacks
def _get_inviter(self, user_id, room_id):
invite = yield self.store.get_invite_for_user_in_room(
invite = yield self.store.get_invite_for_local_user_in_room(
user_id=user_id, room_id=room_id
)
if invite:
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def search(self, user, content, batch=None):
search_filter = Filter(filter_dict)

# TODO: Search through left rooms too
rooms = yield self.store.get_rooms_for_user_where_membership_is(
rooms = yield self.store.get_rooms_for_local_user_where_membership_is(
user.to_string(),
membership_list=[Membership.JOIN],
# membership_list=[Membership.JOIN, Membership.LEAVE, Membership.Ban],
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -1662,7 +1662,7 @@ async def _get_all_rooms(self, sync_result_builder, ignored_users):
Membership.BAN,
)

room_list = await self.store.get_rooms_for_user_where_membership_is(
room_list = await self.store.get_rooms_for_local_user_where_membership_is(
user_id=user_id, membership_list=membership_list
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/push/push_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

@defer.inlineCallbacks
def get_badge_count(store, user_id):
invites = yield store.get_invited_rooms_for_user(user_id)
invites = yield store.get_invited_rooms_for_local_user(user_id)
joins = yield store.get_rooms_for_user(user_id)

my_receipts_by_room = yield store.get_receipts_for_user(user_id, "m.read")
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def invalidate_caches_for_event(

if etype == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
self.get_invited_rooms_for_user.invalidate((state_key,))
self.get_invited_rooms_for_local_user.invalidate((state_key,))

if relates_to:
self.get_relations_for_event.invalidate_many((relates_to,))
Expand Down
2 changes: 1 addition & 1 deletion synapse/server_notices/server_notices_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def get_notice_room_for_user(self, user_id):

assert self._is_mine_id(user_id), "Cannot send server notices to remote users"

rooms = yield self._store.get_rooms_for_user_where_membership_is(
rooms = yield self._store.get_rooms_for_local_user_where_membership_is(
user_id, [Membership.INVITE, Membership.JOIN]
)
system_mxid = self._config.server_notices_mxid
Expand Down
30 changes: 30 additions & 0 deletions synapse/storage/data_stores/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def _censor_redactions():
hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000)

self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
self.is_mine_id = hs.is_mine_id

@defer.inlineCallbacks
def _read_forward_extremities(self):
Expand Down Expand Up @@ -547,6 +548,34 @@ def _update_current_state_txn(self, txn, state_delta_by_room, stream_id):
],
)

# Note: Do we really want to delete rows here (that we do not
# subsequently reinsert below)? While technically correct it means
# we have no record of the fact the user *was* a member of the
# room but got, say, state reset out of it.
if to_delete or to_insert:
txn.executemany(
"DELETE FROM local_current_membership"
" WHERE room_id = ? AND user_id = ?",
(
(room_id, state_key)
for etype, state_key in itertools.chain(to_delete, to_insert)
if etype == EventTypes.Member and self.is_mine_id(state_key)
),
)

if to_insert:
txn.executemany(
"""INSERT INTO local_current_membership
(room_id, user_id, event_id, membership)
VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
""",
[
(room_id, key[1], ev_id, ev_id)
for key, ev_id in to_insert.items()
if key[0] == EventTypes.Member and self.is_mine_id(key[1])
],
)

txn.call_after(
self._curr_state_delta_stream_cache.entity_has_changed,
room_id,
Expand Down Expand Up @@ -1724,6 +1753,7 @@ def _purge_room_txn(self, txn, room_id):
"local_invites",
"room_account_data",
"room_tags",
"local_current_membership",
):
logger.info("[purge] removing %s from %s", room_id, table)
txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))
Expand Down
Loading