Skip to content

Commit

Permalink
PB-35 Validate the external assets in Django Admin
Browse files Browse the repository at this point in the history
Refactor the validators on the external asset serializer to the
validators module for that purpose. Refactored the Django admin for
Assets a bit, fixed a bug in the process.
  • Loading branch information
schtibe committed Jul 29, 2024
1 parent aacf949 commit 1dbca44
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 140 deletions.
66 changes: 43 additions & 23 deletions app/stac_api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from stac_api.models import Provider
from stac_api.utils import build_asset_href
from stac_api.utils import get_query_params
from stac_api.validators import validate_href_url
from stac_api.validators import validate_text_to_geometry

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -387,8 +388,50 @@ def get_fieldsets(self, request, obj=None):
return fields


class AssetAdminForm(forms.ModelForm):

def __init__(self, *args, **kwargs):
"""If it's an external asset, we switch the file field to a char field"""
super().__init__(*args, **kwargs)
if self.instance is not None and self.instance.id is not None:
are_external_assets_allowed = self.instance.item.collection.allow_external_assets

if are_external_assets_allowed:
external_field = self.fields['is_external']
external_field.help_text = (
_('Whether this asset is hosted externally. Save the form in '
'order to toggle the file field between input and file widget.')
)

if self.instance.is_external:
# can't just change the widget, otherwise it is impossible to
# change the value!
self.fields['file'] = forms.CharField()

# make it a bit wider
self.fields['file'].widget.attrs['size'] = 150
self.fields['file'].widget.attrs['placeholder'
] = 'https://map.geo.admin.ch/external.jpg'

def clean_file(self):
if self.instance:
external_changed = 'is_external' in self.changed_data
is_external = self.cleaned_data.get('is_external')

# if we're just changing from internal to external, so we don't
# validate the url, because it is potentially still the previous,
# internal file
if is_external and not external_changed:
file = self.cleaned_data.get('file')

validate_href_url(file, self.instance.item.collection)

return self.cleaned_data.get('file')


@admin.register(Asset)
class AssetAdmin(admin.ModelAdmin):
form = AssetAdminForm

class Media:
js = ('js/admin/asset_help_search.js', 'js/admin/asset_external_fields.js')
Expand Down Expand Up @@ -476,29 +519,6 @@ def href(self, instance):
return path
return build_asset_href(self.request, path)

def render_change_form(self, request, context, *args, **kwargs):
"""Make the file field be a text input if asset is external"""
obj = kwargs['obj']
if obj is not None:
are_external_assets_allowed = obj.item.collection.allow_external_assets

form_instance = context['adminform'].form

if are_external_assets_allowed:
external_field = form_instance.fields['is_external']
external_field.help_text = (
_('Whether this asset is hosted externally. Save the form in '
'order to toggle the file field between input and file widget.')
)

if obj.is_external:
form_instance.fields['file'].widget = forms.TextInput(
attrs={
'size': 150, 'placeholder': 'https://map.geo.admin.ch/external.jpg'
}
)
return super().render_change_form(request, context, *args, **kwargs)

def get_fieldsets(self, request, obj=None):
"""Build the different field sets for the admin page
Expand Down
82 changes: 7 additions & 75 deletions app/stac_api/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@
from collections import OrderedDict
from urllib.parse import urlparse

import requests

from django.conf import settings
from django.contrib.gis.geos import GEOSGeometry
from django.core import exceptions
from django.core.validators import URLValidator
from django.core.exceptions import ValidationError as CoreValidationError
from django.utils.translation import gettext_lazy as _

from rest_framework import serializers
Expand Down Expand Up @@ -41,6 +38,7 @@
from stac_api.validators import validate_checksum_multihash_sha256
from stac_api.validators import validate_content_encoding
from stac_api.validators import validate_geoadmin_variant
from stac_api.validators import validate_href_url
from stac_api.validators import validate_item_properties_datetimes
from stac_api.validators import validate_md5_parts
from stac_api.validators import validate_name
Expand Down Expand Up @@ -402,76 +400,6 @@ def to_representation(self, instance):
)
return representation

def _validate_general_url_pattern(self, url):
try:
validator = URLValidator()
validator(url)
except exceptions.ValidationError as exc:
errors = {'href': _('Invalid URL provided')}
raise serializers.ValidationError(code='payload', detail=errors) from exc

def _validate_configured_url_pattern(self, url):
"""Validate the URL against the whitelist"""
whitelist = self.collection.external_asset_whitelist

for entry in whitelist:
if url.startswith(entry):
return True

logger.info(
"Attempted external asset upload didn't match the whitelist",
extra={
'url': url, 'whitelist': whitelist, 'collection': self.collection
}
)

# none of the prefixes matches
errors = {'href': _("Invalid URL provided. It doesn't match the collection whitelist")}
raise serializers.ValidationError(code='payload', detail=errors)

def _validate_reachability(self, url):
unreachable_error = {'href': _('Provided URL is unreachable')}
try:
response = requests.head(url, timeout=settings.EXTERNAL_URL_REACHABLE_TIMEOUT)

if response.status_code > 400:
logger.info(
"Attempted external asset upload failed the reachability check",
extra={
'url': url,
'collection': self.collection,
'response': response,
}
)
raise serializers.ValidationError(code='payload', detail=unreachable_error)
except requests.Timeout as exc:
logger.info(
"Attempted external asset upload resulted in a timeout",
extra={
'url': url,
'collection': self.collection,
'exception': exc,
'timeout': settings.EXTERNAL_URL_REACHABLE_TIMEOUT
}
)
errors = {'href': _('Checking href URL resulted in timeout')}
raise serializers.ValidationError(code='payload', detail=errors) from exc
except requests.ConnectionError as exc:
logger.info(
"Attempted external asset upload resulted in connection error",
extra={
'url': url,
'collection': self.collection,
'exception': exc,
}
)
raise serializers.ValidationError(code='payload', detail=unreachable_error) from exc

def _validate_href_url(self, url):
self._validate_general_url_pattern(url)
self._validate_configured_url_pattern(url)
self._validate_reachability(url)

def _validate_href_field(self, attrs):
"""Only allow the href field if the collection allows for external assets
Expand All @@ -495,7 +423,11 @@ def _validate_href_field(self, attrs):
errors = {'href': _("Found read-only property in payload")}
raise serializers.ValidationError(code="payload", detail=errors)

self._validate_href_url(attrs['file'])
try:
validate_href_url(attrs['file'], collection)
except CoreValidationError as e:
errors = {'href': e.message}
raise serializers.ValidationError(code='payload', detail=errors)

def validate(self, attrs):
self._validate_href_field(attrs)
Expand Down
80 changes: 80 additions & 0 deletions app/stac_api/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
from datetime import datetime

import multihash
import requests
from multihash.constants import CODE_HASHES
from multihash.constants import HASH_CODES

from django.conf import settings
from django.contrib.gis.gdal.error import GDALException
from django.contrib.gis.geos import GEOSGeometry
from django.contrib.gis.geos.error import GEOSException
from django.core import exceptions
from django.core.exceptions import ValidationError
from django.core.validators import URLValidator
from django.utils.translation import gettext_lazy as _

from stac_api.utils import fromisoformat
Expand Down Expand Up @@ -555,3 +559,79 @@ def validate_content_encoding(value):
params={'encoding': value, 'supported_encodings': ', '.join(supported_encodings)},
code='invalid'
)


def _validate_general_url_pattern(url):
try:
validator = URLValidator()
validator(url)
except exceptions.ValidationError as exc:
error = _('Invalid URL provided')
raise ValidationError(error) from exc


def _validate_configured_url_pattern(url, collection):
"""Validate the URL against the whitelist"""
whitelist = collection.external_asset_whitelist

for entry in whitelist:
if url.startswith(entry):
return True

logger.info(
"Attempted external asset upload didn't match the whitelist",
extra={
'url': url, 'whitelist': whitelist, 'collection': collection
}
)

# none of the prefixes matches
error = _("Invalid URL provided. It doesn't match the collection whitelist")
raise ValidationError(error)


def _validate_reachability(url, collection):
unreachable_error = _('Provided URL is unreachable')
try:
response = requests.head(url, timeout=settings.EXTERNAL_URL_REACHABLE_TIMEOUT)

if response.status_code > 400:
logger.info(
"Attempted external asset upload failed the reachability check",
extra={
'url': url,
'collection': collection,
'response': response,
}
)
raise ValidationError(unreachable_error)
except requests.Timeout as exc:
logger.info(
"Attempted external asset upload resulted in a timeout",
extra={
'url': url,
'collection': collection,
'exception': exc,
'timeout': settings.EXTERNAL_URL_REACHABLE_TIMEOUT
}
)
error = _('Checking href URL resulted in timeout')
raise ValidationError(error) from exc
except requests.ConnectionError as exc:
logger.info(
"Attempted external asset upload resulted in connection error",
extra={
'url': url,
'collection': collection,
'exception': exc,
}
)
raise ValidationError(unreachable_error) from exc


def validate_href_url(url, collection):
"""Validate the href URL """

_validate_general_url_pattern(url)
_validate_configured_url_pattern(url, collection)
_validate_reachability(url, collection)
42 changes: 0 additions & 42 deletions app/tests/tests_10/test_external_assets_endpoint.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
from parameterized import parameterized

from django.conf import settings
from django.test import Client

from rest_framework import serializers

from stac_api.models import Asset
from stac_api.serializers import AssetSerializer

from tests.tests_10.base_test import StacBaseTestCase
from tests.tests_10.data_factory import Factory
Expand Down Expand Up @@ -223,40 +218,3 @@ def test_delete_asset_with_external_url(self):
)

self.assertStatusCode(200, response)

@parameterized.expand([
(['https://test-domain.test'], 'https://test-domain.test/collection/test.jpg', True),
(['https://test-domain'], 'https://test-domain.test/collection/test.jpg', True),
(['https://test-domaine', 'https://test-domain'],
'https://test-domain.test/collection/test.jpg',
True), # trying to keep the formatting stable
(['https://test-domain.test/collection'],
'https://test-domain.test/collection/test.jpg',
True), # trying to keep the formatting stable here
(['https://test-domain.tst', 'https://something-else.ch'],
'https://test-domain.test/collection/test.jpg',
False),
(['http://test-domain.test'], 'https://test-domain.tst/collection/test.jpg', False),
(['https://test-domain.test'], 'http://test-domain.test/collection/test.jpg', False)
])
def test_create_external_asset_with_collection_pattern(self, patterns, href, result):
collection = self.collection

# item = self.item

class Obj:

@property
def collection(self):
collection.external_asset_whitelist = patterns
collection.save()

return collection

if result:
# pylint: disable=W0212:protected-access
AssetSerializer._validate_configured_url_pattern(Obj(), href)
else:
with self.assertRaises(serializers.ValidationError):
# pylint: disable=W0212:protected-access
AssetSerializer._validate_configured_url_pattern(Obj(), href)
Loading

0 comments on commit 1dbca44

Please sign in to comment.