Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Share logic between being able to list rooms and delete aliases.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Feb 27, 2020
1 parent 3ba2161 commit 9279219
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 45 deletions.
9 changes: 2 additions & 7 deletions synapse/api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ def compute_auth_events(

@defer.inlineCallbacks
def check_can_change_room_list(self, room_id: str, user: UserID):
"""Check if the user is allowed to edit the room's entry in the
"""Determine whether the user is allowed to edit the room's entry in the
published room list.
Args:
Expand Down Expand Up @@ -570,12 +570,7 @@ def check_can_change_room_list(self, room_id: str, user: UserID):
)
user_level = event_auth.get_user_power_level(user_id, auth_events)

if user_level < send_level:
raise AuthError(
403,
"This server requires you to be a moderator in the room to"
" edit its room list entry",
)
return user_level >= send_level

@staticmethod
def has_access_token(request):
Expand Down
33 changes: 12 additions & 21 deletions synapse/handlers/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

from twisted.internet import defer

from synapse import event_auth
from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes
from synapse.api.errors import (
AuthError,
Expand Down Expand Up @@ -390,32 +389,16 @@ def _user_can_delete_alias(self, alias: RoomAlias, user_id: str):
if creator is not None and creator == user_id:
return True

is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id))
if is_admin:
return True

# Resolve the alias to the corresponding room.
room_mapping = yield self.get_association(alias)
room_id = room_mapping["room_id"]
if not room_id:
return False

# Check if the user has sufficient power-level to send a canonical alias
# event.
power_level_event = yield self.state.get_current_state(
room_id, EventTypes.PowerLevels, ""
)

auth_events = {}
if power_level_event:
auth_events[(EventTypes.PowerLevels, "")] = power_level_event

send_level = event_auth.get_send_level(
EventTypes.CanonicalAlias, "", power_level_event
res = yield self.auth.check_can_change_room_list(
room_id, UserID.from_string(user_id)
)
user_level = event_auth.get_user_power_level(user_id, auth_events)

return user_level >= send_level
return res

@defer.inlineCallbacks
def edit_published_room_list(
Expand Down Expand Up @@ -450,7 +433,15 @@ def edit_published_room_list(
if room is None:
raise SynapseError(400, "Unknown room")

yield self.auth.check_can_change_room_list(room_id, requester.user)
can_change_room_list = yield self.auth.check_can_change_room_list(
room_id, requester.user
)
if not can_change_room_list:
raise AuthError(
403,
"This server requires you to be a moderator in the room to"
" edit its room list entry",
)

making_public = visibility == "public"
if making_public:
Expand Down
39 changes: 22 additions & 17 deletions tests/handlers/test_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ def prepare(self, reactor, clock, hs):
self.test_alias = "#test:test"
self.room_alias = RoomAlias.from_string(self.test_alias)

# Create a test user.
self.test_user = self.register_user("user", "pass", admin=False)
self.test_user_tok = self.login("user", "pass")
self.helper.join(room=self.room_id, user=self.test_user, tok=self.test_user_tok)

def _create_alias(self, user):
# Create a new alias to this room.
self.get_success(
Expand All @@ -140,44 +145,44 @@ def test_delete_alias_not_allowed(self):
self._create_alias(self.admin_user)
self.get_failure(
self.handler.delete_association(
create_requester("@user:test"), self.room_alias
create_requester(self.test_user), self.room_alias
),
synapse.api.errors.AuthError,
)

def test_delete_alias_creator(self):
"""An alias creator can delete the alias."""
"""An alias creator can delete their own alias."""
# Create an alias from a different user.
user_id = "@user:test"
self._create_alias(user_id)
self._create_alias(self.test_user)

# Deleting the alias completes successfully.
# Delete the user's alias.
result = self.get_success(
self.handler.delete_association(create_requester(user_id), self.room_alias)
self.handler.delete_association(
create_requester(self.test_user), self.room_alias
)
)
self.assertEquals(self.room_id, result)

# The alias should not be found.
# Confirm the alias is gone.
self.get_failure(
self.handler.get_association(self.room_alias),
synapse.api.errors.SynapseError,
)

def test_delete_alias_admin(self):
"""A server admin can delete an alias."""
"""A server admin can delete an alias created by another user."""
# Create an alias from a different user.
user_id = "@user:test"
self._create_alias(user_id)
self._create_alias(self.test_user)

# Deleting the alias as the admin completes successfully.
# Delete the user's alias as the admin.
result = self.get_success(
self.handler.delete_association(
create_requester(self.admin_user), self.room_alias
)
)
self.assertEquals(self.room_id, result)

# The alias should not be found.
# Confirm the alias is gone.
self.get_failure(
self.handler.get_association(self.room_alias),
synapse.api.errors.SynapseError,
Expand All @@ -187,23 +192,23 @@ def test_delete_alias_sufficient_power(self):
"""A user with a sufficient power level should be able to delete an alias."""
self._create_alias(self.admin_user)

# A user with sufficient power levels should be able to delete an alias.
other_user_id = "@other:test"
# Increase the user's power level.
self.helper.send_state(
self.room_id,
"m.room.power_levels",
{"users": {other_user_id: 100}},
{"users": {self.test_user: 100}},
tok=self.admin_user_tok,
)

# They can now delete the alias.
result = self.get_success(
self.handler.delete_association(
create_requester(other_user_id), self.room_alias
create_requester(self.test_user), self.room_alias
)
)
self.assertEquals(self.room_id, result)

# The alias should not be found.
# Confirm the alias is gone.
self.get_failure(
self.handler.get_association(self.room_alias),
synapse.api.errors.SynapseError,
Expand Down

0 comments on commit 9279219

Please sign in to comment.