Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 3.x] [Fixes #5801] Check User permissions for private group #6651

Merged
merged 1 commit into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions geonode/documents/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,11 +520,10 @@ def document_metadata(
try:
all_metadata_author_groups = chain(
request.user.group_list_all(),
GroupProfile.objects.exclude(
access="private").exclude(access="public-invite"))
GroupProfile.objects.exclude(access="private"))
except Exception:
all_metadata_author_groups = GroupProfile.objects.exclude(
access="private").exclude(access="public-invite")
access="private")
[metadata_author_groups.append(item) for item in all_metadata_author_groups
if item not in metadata_author_groups]

Expand Down
170 changes: 169 additions & 1 deletion geonode/groups/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,174 @@ def test_users_belongs_registered_group(self):
self.assertTrue(groupprofile.user_is_role(admin, 'manager'))
self.assertFalse(admin in groupprofile.get_managers())

def test_users_group_list_view(self):
"""
1. Ensures that a superuser can see the whole group list.

2. Ensures that a user can see only public/public-invite groups.

3. Ensures that a user belonging to a private group, can see it.
"""
bobby = get_user_model().objects.get(username="bobby")

public_group, _public_created = GroupProfile.objects.get_or_create(
slug='public_group',
title='public_group',
access='public')
private_group, _private_created = GroupProfile.objects.get_or_create(
slug='private_group',
title='private_group',
access='private')

private_group.join(bobby)
data = {
"query": "p",
"page": 1,
"pageSize": 9
}

# Anonymous
"""
'{
"users": [], "count": 0,
"groups": [
{"name": "public_group", "title": "public_group"}]
}'
"""
response = self.client.post(
reverse(
'account_ajax_lookup'
),
data
)
self.assertEqual(response.status_code, 200)
content = json.loads(response.content)
logger.debug(f"Anonymous --> {content}")
self.assertEqual(len(content["groups"]), 1)
self.assertEqual(content["groups"][0]["name"], "public_group")
response = self.client.get(
reverse(
'group_detail',
args=['public_group', ])
)
self.assertEqual(response.status_code, 200)
response = self.client.get(
reverse(
'group_detail',
args=['private_group', ])
)
self.assertEqual(response.status_code, 404)

# Admin
"""
'{
"users": [], "count": 0,
"groups": [
{"name": "public_group", "title": "public_group"},
{"name": "private_group", "title": "private_group"}]
}'
"""
self.assertTrue(self.client.login(username="admin", password="admin"))
response = self.client.post(
reverse(
'account_ajax_lookup'
),
data
)
self.assertEqual(response.status_code, 200)
content = json.loads(response.content)
logger.debug(f"admin --> {content}")
self.assertEqual(len(content["groups"]), 2)
self.assertEqual(content["groups"][0]["name"], "public_group")
self.assertEqual(content["groups"][1]["name"], "private_group")
response = self.client.get(
reverse(
'group_detail',
args=['public_group', ])
)
self.assertEqual(response.status_code, 200)
response = self.client.get(
reverse(
'group_detail',
args=['private_group', ])
)
self.assertEqual(response.status_code, 200)

# Bobby
"""
'{
"users": [], "count": 0,
"groups": [
{"name": "public_group", "title": "public_group"},
{"name": "private_group", "title": "private_group"}]
}'
"""
self.assertTrue(self.client.login(username="bobby", password="bob"))
response = self.client.post(
reverse(
'account_ajax_lookup'
),
data
)
self.assertEqual(response.status_code, 200)
content = json.loads(response.content)
logger.debug(f"bobby --> {content}")
self.assertEqual(len(content["groups"]), 2)
self.assertEqual(content["groups"][0]["name"], "public_group")
self.assertEqual(content["groups"][1]["name"], "private_group")
response = self.client.get(
reverse(
'group_detail',
args=['public_group', ])
)
self.assertEqual(response.status_code, 200)
response = self.client.get(
reverse(
'group_detail',
args=['private_group', ])
)
self.assertEqual(response.status_code, 200)

# Norman
"""
'{
"users": [], "count": 0,
"groups": [
{"name": "public_group", "title": "public_group"}]
}'
"""
self.assertTrue(self.client.login(username="norman", password="norman"))
response = self.client.post(
reverse(
'account_ajax_lookup'
),
data
)
self.assertEqual(response.status_code, 200)
content = json.loads(response.content)
logger.debug(f"norman --> {content}")
self.assertEqual(len(content["groups"]), 1)
self.assertEqual(content["groups"][0]["name"], "public_group")
response = self.client.get(
reverse(
'group_detail',
args=['public_group', ])
)
self.assertEqual(response.status_code, 200)
response = self.client.get(
reverse(
'group_detail',
args=['private_group', ])
)
self.assertEqual(response.status_code, 404)

if _public_created:
public_group.delete()
self.assertFalse(GroupProfile.objects.filter(slug='public_group').exists())
if _private_created:
private_group.delete()
self.assertFalse(GroupProfile.objects.filter(slug='private_group').exists())

def test_group_permissions_extend_to_user(self):
"""
Ensures that when a user is in a group, the group permissions
Expand Down Expand Up @@ -661,7 +829,7 @@ def setUp(self):
super(GroupCategoriesTestCase, self).setUp()

c1 = GroupCategory.objects.create(name='test #1 category')
g = GroupProfile.objects.create(title='test')
g = GroupProfile.objects.create(slug='test', title='test')
g.categories.add(c1)
g.save()
User = get_user_model()
Expand Down
3 changes: 3 additions & 0 deletions geonode/groups/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ def get_queryset(self):
def get(self, request, *args, **kwargs):
self.group = get_object_or_404(
models.GroupProfile, slug=kwargs.get('slug'))
if self.group.access == 'private' and \
not self.group.user_is_member(request.user):
raise Http404
return super(GroupDetailView, self).get(request, *args, **kwargs)

def get_context_data(self, **kwargs):
Expand Down
5 changes: 2 additions & 3 deletions geonode/layers/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1185,11 +1185,10 @@ def layer_metadata(
try:
all_metadata_author_groups = chain(
request.user.group_list_all().distinct(),
GroupProfile.objects.exclude(
access="private").exclude(access="public-invite"))
GroupProfile.objects.exclude(access="private"))
except Exception:
all_metadata_author_groups = GroupProfile.objects.exclude(
access="private").exclude(access="public-invite")
access="private")
[metadata_author_groups.append(item) for item in all_metadata_author_groups
if item not in metadata_author_groups]

Expand Down
5 changes: 2 additions & 3 deletions geonode/maps/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,10 @@ def map_metadata(
try:
all_metadata_author_groups = chain(
request.user.group_list_all(),
GroupProfile.objects.exclude(
access="private").exclude(access="public-invite"))
GroupProfile.objects.exclude(access="private"))
except Exception:
all_metadata_author_groups = GroupProfile.objects.exclude(
access="private").exclude(access="public-invite")
access="private")
[metadata_author_groups.append(item) for item in all_metadata_author_groups
if item not in metadata_author_groups]

Expand Down
2 changes: 1 addition & 1 deletion geonode/people/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def enhanced_invitation_context(self, context):
user = context.get("inviter") if context.get("inviter") else context.get("user")
full_name = " ".join((user.first_name, user.last_name)) if user.first_name or user.last_name else None
user_groups = GroupProfile.objects.filter(
slug__in=user.groupmember_set.filter().values_list("group__slug", flat=True))
slug__in=user.groupmember_set.all().values_list("group__slug", flat=True))
enhanced_context = context.copy()
enhanced_context.update({
"username": user.username,
Expand Down
12 changes: 6 additions & 6 deletions geonode/tests/test_rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_super_admin_user(self, super_mock):
@patch('geonode.api.authorization.ApiLockdownAuthorization.read_list',
return_value=Group.objects.exclude(name='anonymous'))
@patch('geonode.people.models.Profile.group_list_all', return_value=[2])
def test_regular_user_hide_private(self, super_mock, mocke_profile):
def test_regular_user_hide_private(self, super_mock, mocked_profile):
mock_bundle = MagicMock()
request_mock = MagicMock()
r_attr = {
Expand All @@ -67,7 +67,7 @@ def test_regular_user_hide_private(self, super_mock, mocke_profile):
@patch('geonode.api.authorization.ApiLockdownAuthorization.read_list',
return_value=Group.objects.exclude(name='anonymous'))
@patch('geonode.people.models.Profile.group_list_all', return_value=[1])
def test_regular_user(self, super_mock, mocke_profile):
def test_regular_user(self, super_mock, mocked_profile):
mock_bundle = MagicMock()
request_mock = MagicMock()
r_attr = {
Expand All @@ -85,7 +85,7 @@ def test_regular_user(self, super_mock, mocke_profile):
@patch('geonode.api.authorization.ApiLockdownAuthorization.read_list',
return_value=Group.objects.exclude(name='anonymous'))
@patch('geonode.people.models.Profile.group_list_all', return_value=[1])
def test_anonymous_user(self, super_mock, mocke_profile):
def test_anonymous_user(self, super_mock, mocked_profile):
mock_bundle = MagicMock()
request_mock = MagicMock()
r_attr = {
Expand Down Expand Up @@ -122,7 +122,7 @@ def test_super_admin_user(self, super_mock):

@patch('geonode.api.authorization.ApiLockdownAuthorization.read_list', return_value=GroupProfile.objects.all())
@patch('geonode.people.models.Profile.group_list_all', return_value=[2])
def test_regular_user_hide_private(self, super_mock, mocke_profile):
def test_regular_user_hide_private(self, super_mock, mocked_profile):
mock_bundle = MagicMock()
request_mock = MagicMock()
r_attr = {
Expand All @@ -139,7 +139,7 @@ def test_regular_user_hide_private(self, super_mock, mocke_profile):

@patch('geonode.api.authorization.ApiLockdownAuthorization.read_list', return_value=GroupProfile.objects.all())
@patch('geonode.people.models.Profile.group_list_all', return_value=[1])
def test_regular_user(self, super_mock, mocke_profile):
def test_regular_user(self, super_mock, mocked_profile):
mock_bundle = MagicMock()
request_mock = MagicMock()
r_attr = {
Expand All @@ -156,7 +156,7 @@ def test_regular_user(self, super_mock, mocke_profile):

@patch('geonode.api.authorization.ApiLockdownAuthorization.read_list', return_value=GroupProfile.objects.all())
@patch('geonode.people.models.Profile.group_list_all', return_value=[1])
def test_anonymous_user(self, super_mock, mocke_profile):
def test_anonymous_user(self, super_mock, mocked_profile):
mock_bundle = MagicMock()
request_mock = MagicMock()
r_attr = {
Expand Down
25 changes: 19 additions & 6 deletions geonode/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#########################################################################
import json

from django import forms
import json
from django.db.models import Q
from django.urls import reverse
from django.conf import settings
Expand Down Expand Up @@ -84,15 +84,28 @@ def ajax_lookup(request):
content='use a field named "query" to specify a prefix to filter usernames',
content_type='text/plain')
keyword = request.POST['query']
users = get_user_model().objects.filter(Q(username__icontains=keyword)).exclude(Q(username='AnonymousUser') |
Q(is_active=False))
groups = GroupProfile.objects.filter(Q(title__icontains=keyword) |
Q(slug__icontains=keyword))
users = get_user_model().objects.filter(
Q(username__icontains=keyword)).exclude(Q(username='AnonymousUser') |
Q(is_active=False))
if request.user and request.user.is_authenticated and request.user.is_superuser:
groups = GroupProfile.objects.filter(
Q(title__icontains=keyword) |
Q(slug__icontains=keyword))
elif request.user.is_anonymous:
groups = GroupProfile.objects.filter(
Q(title__icontains=keyword) |
Q(slug__icontains=keyword)).exclude(Q(access='private'))
else:
groups = GroupProfile.objects.filter(
Q(title__icontains=keyword) |
Q(slug__icontains=keyword)).exclude(
Q(access='private') & ~Q(
slug__in=request.user.groupmember_set.all().values_list("group__slug", flat=True))
)
json_dict = {
'users': [({'username': u.username}) for u in users],
'count': users.count(),
}

json_dict['groups'] = [({'name': g.slug, 'title': g.title})
for g in groups]
return HttpResponse(
Expand Down