Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Bugs/syn 264 #67

Merged
merged 2 commits into from
Feb 11, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 47 additions & 18 deletions synapse/storage/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@

class RoomMemberStore(SQLBaseStore):

def __init__(self, *args, **kw):
super(RoomMemberStore, self).__init__(*args, **kw)

self._user_rooms_cache = {}

def _store_room_member_txn(self, txn, event):
"""Store a room member in the database.
"""
Expand Down Expand Up @@ -98,6 +103,8 @@ def _store_room_member_txn(self, txn, event):

txn.execute(sql, (event.room_id, domain))

self.invalidate_rooms_for_user(target_user_id)

@defer.inlineCallbacks
def get_room_member(self, user_id, room_id):
"""Retrieve the current state of a room member.
Expand Down Expand Up @@ -240,28 +247,50 @@ def _get_members_query_txn(self, txn, where_clause, where_values):
results = self._parse_events_txn(txn, rows)
return results

# TODO(paul): Create a nice @cached decorator to do this
# @cached
# def get_foo(...)
# ...
# invalidate_foo = get_foo.invalidator

@defer.inlineCallbacks
def get_rooms_for_user(self, user_id):
# TODO(paul): put some performance counters in here so we can easily
# track what impact this cache is having
if user_id in self._user_rooms_cache:
defer.returnValue(self._user_rooms_cache[user_id])

rooms = yield self.get_rooms_for_user_where_membership_is(
user_id, membership_list=[Membership.JOIN],
)

self._user_rooms_cache[user_id] = rooms
defer.returnValue(rooms)

def invalidate_rooms_for_user(self, user_id):
if user_id in self._user_rooms_cache:
del self._user_rooms_cache[user_id]

@defer.inlineCallbacks
def user_rooms_intersect(self, user_id_list):
""" Checks whether all the users whose IDs are given in a list share a
room.

This is a "hot path" function that's called a lot, e.g. by presence for
generating the event stream. As such, it is implemented locally by
wrapping logic around heavily-cached database queries.
"""
def interaction(txn):
user_list_clause = " OR ".join(["m.user_id = ?"] * len(user_id_list))
sql = (
"SELECT m.room_id FROM room_memberships as m "
"INNER JOIN current_state_events as c "
"ON m.event_id = c.event_id "
"WHERE m.membership = 'join' "
"AND (%(clause)s) "
# TODO(paul): We've got duplicate rows in the database somewhere
# so we have to DISTINCT m.user_id here
"GROUP BY m.room_id HAVING COUNT(DISTINCT m.user_id) = ?"
) % {"clause": user_list_clause}

args = list(user_id_list)
args.append(len(user_id_list))
if len(user_id_list) < 2:
defer.returnValue(True)

txn.execute(sql, args)
deferreds = [self.get_rooms_for_user(u) for u in user_id_list]

results = yield defer.DeferredList(deferreds)

# A list of sets of strings giving room IDs for each user
room_id_lists = [set([r.room_id for r in result[1]]) for result in results]

return len(txn.fetchall()) > 0
# There isn't a setintersection(*list_of_sets)
ret = len(room_id_lists.pop(0).intersection(*room_id_lists)) > 0

return self.runInteraction("user_rooms_intersect", interaction)
defer.returnValue(ret)