Skip to content

Commit

Permalink
feat: new helper functions
Browse files Browse the repository at this point in the history
- get_resource_dc_config
- get_resource_info
- s3cc.get_s3_dc_handle_basin_based
  • Loading branch information
paulmueller committed Feb 27, 2024
1 parent 62b7823 commit 2c30994
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 68 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
0.7.5
0.8.0
- feat: introduce `get_resource_dc_config`, `get_resource_info`, and
`s3cc.get_s3_dc_handle_basin_based`
- enh: return S3 URL in s3cc.upload_artifact
- enh: make specifying owner organization and create context in
`testing.make_dataset` optional
0.7.4
- fix: disable basins when loading a dataset instance
- enh: implement s3cc.upload_artifact
Expand Down
4 changes: 3 additions & 1 deletion dcor_shared/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# flake8: noqa: F401
from .ckan import get_ckan_config_option, get_resource_path
from .ckan import (
get_ckan_config_option, get_resource_dc_config, get_resource_path, get_resource_info
)
from .data import DUMMY_BYTES, sha256sum, wait_for_resource
from .dcinst import get_dc_instance
from .mime import DC_MIME_TYPES, VALID_FORMATS
Expand Down
38 changes: 37 additions & 1 deletion dcor_shared/ckan.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,43 @@ def get_ckan_webassets_path():
return pathlib.Path(get_ckan_config_option("ckan.webassets.path"))


def get_resource_path(rid, create_dirs=False):
def get_resource_dc_config(resource_id):
"""Return the DC metadata for a resource identifier
For this to work, the dcor_schemas CKAN extension must be active.
"""
_, res_dict = get_resource_info(resource_id)
# build metadata dictionary from resource metadata
meta = {}
for item in res_dict:
if item.startswith("dc:"):
_, sec, key = item.split(":", 2)
meta.setdefault(sec, {})
meta[sec][key] = res_dict[item]
return meta


def get_resource_info(resource_id):
"""Return resource and dataset dictionaries for a resource identifier
Return the dataset dictionary and the resource dictionary.
"""
from ckan import logic
res_dict = logic.get_action("resource_show")(
context={'ignore_auth': True, 'user': 'default'},
data_dict={"id": resource_id})
ds_dict = logic.get_action("package_show")(
context={'ignore_auth': True, 'user': 'default'},
data_dict={"id": res_dict["package_id"]})
return ds_dict, res_dict


def get_resource_path(resource_id, create_dirs=False):
"""Return the expected local path for a resource identifier
If `create_dirs` is True, create the parent directory tree.
"""
rid = resource_id
resources_path = get_ckan_storage_path() / "resources"
pdir = resources_path / rid[:3] / rid[3:6]
path = pdir / rid[6:]
Expand Down
71 changes: 55 additions & 16 deletions dcor_shared/s3cc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@
via just the resource ID.
"""
from __future__ import annotations
import io
import functools
import pathlib
from typing import Literal
from urllib.parse import urlparse
import warnings

from dclab.rtdc_dataset import fmt_s3
import dclab
from dclab.rtdc_dataset import fmt_hdf5, fmt_s3
import h5py

from .ckan import get_ckan_config_option
from .ckan import (
get_ckan_config_option, get_resource_dc_config, get_resource_info
)
from .data import sha256sum
from . import s3

Expand Down Expand Up @@ -81,13 +87,7 @@ def get_s3_bucket_name_for_resource(resource_id):
The resource with the identifier `resource_id` must exist in the
CKAN database.
"""
import ckan.logic
res_dict = ckan.logic.get_action('resource_show')(
context={'ignore_auth': True, 'user': 'default'},
data_dict={"id": resource_id})
ds_dict = ckan.logic.get_action('package_show')(
context={'ignore_auth': True, 'user': 'default'},
data_dict={'id': res_dict["package_id"]})
ds_dict, _ = get_resource_info(resource_id)
bucket_name = get_ckan_config_option(
"dcor_object_store.bucket_name").format(
organization_id=ds_dict["organization"]["id"])
Expand All @@ -97,6 +97,9 @@ def get_s3_bucket_name_for_resource(resource_id):
def get_s3_dc_handle(resource_id):
"""Return an instance of :class:`RTDC_S3`
The data are accessed directly via S3 using DCOR's access credentials.
Use this if you need to access the original raw file.
The resource with the identifier `resource_id` must exist in the
CKAN database.
"""
Expand All @@ -114,6 +117,48 @@ def get_s3_dc_handle(resource_id):
return ds


def get_s3_dc_handle_basin_based(resource_id):
"""Return a :class:`RTDC_HTTP`-basin-backed instance of :class:`RTDC_HDF5`
The returned instance does not contain any feature data, but has
basins defined that link to the original data on S3. The upside
over :func:`get_s3_dc_handle` is that the returned dataset
includes the basin with the condensed data and that the returned
instance does not contain the DCOR S3 credentials. The downside is
that initialization takes slightly longer and that, if private
resources are accessed, the presigned URLs in the basins are only
valid for a fixed time period.
"""
ds_dict, res_dict = get_resource_info(resource_id)
basin_paths = []
for artifact in ["resource", "condensed"]:
if ds_dict["private"]:
bp = create_presigned_url(resource_id, artifact=artifact)
else:
bp = get_s3_url_for_artifact(resource_id, artifact=artifact)
basin_paths.append(bp)

fd = io.BytesIO()
with h5py.File(fd, "w", libver="latest") as hv:
# We don't use RTDCWriter as a context manager to avoid overhead
# during __exit__, but then we have to make sure "events" is there.
hv.require_group("events")
hw = dclab.RTDCWriter(hv)
hw.store_metadata(get_resource_dc_config(resource_id))
for bp in basin_paths:
urlp = urlparse(bp) # e.g. http://localhost/bucket/resource/id
hw.store_basin(
basin_name=urlp.path.split("/")[2], # e.g. "resource"
basin_format="http",
basin_type="remote",
basin_locs=[bp],
# Don't verify anything. This would only cost time,
# and we know these objects exist.
verify=False,
)
return fmt_hdf5.RTDC_HDF5(fd)


def get_s3_url_for_artifact(
resource_id: str,
artifact: Literal["condensed", "preview", "resource"] = "resource"):
Expand Down Expand Up @@ -193,13 +238,7 @@ def upload_artifact(
if private is None:
# User did not say whether the resource is private. We have to
# find out ourselves.
import ckan.logic
res_dict = ckan.logic.get_action('resource_show')(
context={'ignore_auth': True, 'user': 'default'},
data_dict={"id": resource_id})
ds_dict = ckan.logic.get_action('package_show')(
context={'ignore_auth': True, 'user': 'default'},
data_dict={'id': res_dict["package_id"]})
ds_dict, _ = get_resource_info(resource_id)
private = ds_dict["private"]

rid = resource_id
Expand Down
22 changes: 20 additions & 2 deletions dcor_shared/testing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from io import BytesIO

import ckan.authz
import ckan.tests.factories as factories
import ckan.tests.helpers as helpers
from ckan.tests.pytest_ckan.fixtures import FakeFileStorage
import requests
Expand Down Expand Up @@ -34,8 +36,25 @@ def factory(data, filename, context=None, **kwargs):
return factory


def make_dataset(create_context, owner_org, create_with_upload=None,
def make_dataset(create_context=None, owner_org=None, create_with_upload=None,
resource_path=None, activate=False, **kwargs):
"""Create a dataset with a resource for testing"""
if create_context is None:
user = factories.User()
create_context = {'ignore_auth': False,
'user': user['name'],
'api_version': 3}
user_id = user["id"]
else:
# get user ID from create_context
user_id = ckan.authz.get_user_id_for_username(create_context["user"])

if owner_org is None:
owner_org = factories.Organization(users=[{
'name': user_id,
'capacity': 'admin'
}])

if "title" not in kwargs:
kwargs["title"] = "test-dataset"
if "authors" not in kwargs:
Expand Down Expand Up @@ -91,7 +110,6 @@ def synchronous_enqueue_job(job_func, args=None, kwargs=None, title=None,
"""
Synchronous mock for ``ckan.plugins.toolkit.enqueue_job``.
Due to the asynchronous nature of background jobs, code that uses them
needs to be handled specially when writing tests.
Expand Down
53 changes: 53 additions & 0 deletions tests/test_ckan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import pathlib
from unittest import mock

import ckanext.dcor_schemas.plugin

from dcor_shared import get_resource_dc_config, get_resource_info

import pytest
from dcor_shared.testing import make_dataset, synchronous_enqueue_job

data_path = pathlib.Path(__file__).parent / "data"


@pytest.mark.ckan_config('ckan.plugins', 'dcor_schemas')
@pytest.mark.usefixtures('clean_db', 'with_request_context')
@mock.patch('ckan.plugins.toolkit.enqueue_job',
side_effect=synchronous_enqueue_job)
def test_get_resource_dc_config(enqueue_job_mock, create_with_upload,
monkeypatch):
monkeypatch.setattr(
ckanext.dcor_schemas.plugin,
'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS',
True)

_, res_dict = make_dataset(
create_with_upload=create_with_upload,
resource_path=data_path / "calibration_beads_47.rtdc",
activate=True)

dc_config = get_resource_dc_config(res_dict["id"])
assert dc_config["experiment"]["event count"] == 47
assert dc_config["experiment"]["date"] == "2018-12-11"
assert dc_config["fluorescence"]["laser count"] == 2


@pytest.mark.ckan_config('ckan.plugins', 'dcor_schemas')
@pytest.mark.usefixtures('clean_db', 'with_request_context')
@mock.patch('ckan.plugins.toolkit.enqueue_job',
side_effect=synchronous_enqueue_job)
def test_get_resource_info(enqueue_job_mock, create_with_upload, monkeypatch):
monkeypatch.setattr(
ckanext.dcor_schemas.plugin,
'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS',
True)

ds_dict, res_dict = make_dataset(
create_with_upload=create_with_upload,
resource_path=data_path / "calibration_beads_47.rtdc",
activate=True)

ds_dict_2, res_dict_2 = get_resource_info(res_dict["id"])
assert ds_dict == ds_dict_2
assert res_dict == res_dict_2
20 changes: 2 additions & 18 deletions tests/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,11 @@ def test_get_dc_instance_file(enqueue_job_mock, create_with_upload,
'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS',
True)

user = factories.User()
owner_org = factories.Organization(users=[{
'name': user['id'],
'capacity': 'admin'
}])
create_context = {'ignore_auth': False,
'user': user['name'],
'api_version': 3}
ds_dict, _ = make_dataset(
create_context, owner_org,
create_with_upload=create_with_upload,
resource_path=data_path / "calibration_beads_47.rtdc",
activate=True)

rid = ds_dict["resources"][0]["id"]
resource_path = pathlib.Path(get_resource_path(rid))
assert resource_path.exists(), "sanity check"
Expand Down Expand Up @@ -75,19 +67,11 @@ def test_get_dc_instance_s3(enqueue_job_mock, create_with_upload,
'DISABLE_AFTER_DATASET_CREATE_FOR_CONCURRENT_JOB_TESTS',
True)

user = factories.User()
owner_org = factories.Organization(users=[{
'name': user['id'],
'capacity': 'admin'
}])
create_context = {'ignore_auth': False,
'user': user['name'],
'api_version': 3}
ds_dict, _ = make_dataset(
create_context, owner_org,
create_with_upload=create_with_upload,
resource_path=data_path / "calibration_beads_47.rtdc",
activate=True)

res_dict = ds_dict["resources"][0]
rid = res_dict["id"]
resource_path = pathlib.Path(get_resource_path(rid))
Expand Down
Loading

0 comments on commit 2c30994

Please sign in to comment.