Skip to content

Commit

Permalink
Copy parquet files to exports directory (#182)
Browse files Browse the repository at this point in the history
* Copy parquet files to exports directory

* Document OMOP ES file exporting

* Add copyright header

* Add more copyright headers

* Update documentation

Co-authored-by: Milan Malfait <m.malfait@ucl.ac.uk>

---------

Co-authored-by: Milan Malfait <m.malfait@ucl.ac.uk>
  • Loading branch information
stefpiatek and milanmlft authored Dec 13, 2023
1 parent 763be0f commit a86fbe1
Show file tree
Hide file tree
Showing 23 changed files with 250 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,6 @@ dmypy.json

# VS Code
.vscode

# project specific files
/exports/
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ repos:
pass_filenames: false
entry: mypy
args: ['--config-file=setup.cfg']
additional_dependencies: ['mypy', 'types-PyYAML', 'types-requests']
additional_dependencies: ['mypy', 'types-PyYAML', 'types-requests', 'types-python-slugify']
32 changes: 32 additions & 0 deletions cli/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright (c) University College London Hospitals NHS Foundation Trust
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""CLI testing fixtures."""
import pathlib

import pytest
from core.omop import OmopExtract


@pytest.fixture()
def omop_files(tmp_path_factory: pytest.TempPathFactory) -> OmopExtract:
"""Create an OmopExtract instance using a temporary directory"""
export_dir = tmp_path_factory.mktemp("repo_base")
return OmopExtract(export_dir)


@pytest.fixture()
def resources() -> pathlib.Path:
"""Test resources directory path."""
return pathlib.Path(__file__).parent / "resources"
Binary file added cli/tests/resources/omop/public/CARE_SITE.parquet
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added cli/tests/resources/omop/public/LOCATION.parquet
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added cli/tests/resources/omop/public/PERSON.parquet
Binary file not shown.
Binary file not shown.
Binary file added cli/tests/resources/omop/public/SPECIMEN.parquet
Binary file not shown.
Binary file not shown.
Binary file not shown.
95 changes: 95 additions & 0 deletions cli/tests/test_copy_omop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Copyright (c) University College London Hospitals NHS Foundation Trust
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test copying of OMOP ES data for later export."""
import datetime

import pytest


def test_new_project_copies(omop_files, resources):
"""
Given a valid export directory and hasn't been exported before
When copy to exports is run
Then the public files should be copied and symlinked to the latest export directory
"""
# ARRANGE
input_dir = resources / "omop"
project_name = "Really great cool project"
input_date = datetime.datetime.fromisoformat("2020-06-10T18:00:00")
# ACT
omop_files.copy_to_exports(input_dir, project_name, input_date)
# ASSERT
output_base = omop_files.export_dir / "really-great-cool-project"

# check public files copied
specific_export_dir = (
output_base / "all_extracts" / "omop" / "2020-06-10t18-00-00" / "public"
)
assert (specific_export_dir).exists()
expected_files = [x.stem for x in (input_dir / "public").glob("*.parquet")]
output_files = [x.stem for x in (specific_export_dir).glob("*.parquet")]
assert expected_files == output_files
# check that symlinked files exist
symlinked_dir = output_base / "latest" / "omop" / "public"
symlinked_files = list(symlinked_dir.glob("*.parquet"))
assert expected_files == [x.stem for x in symlinked_files]
assert symlinked_dir.is_symlink()


def test_second_export(omop_files, resources):
"""
Given one export already exists for the project
When a second export with a different timestamp is run for the same project
Then there should be two export directories in the all_extracts dir,
and the symlinked dir should point to the most recently copied dir
"""
# ARRANGE
input_dir = resources / "omop"
project_name = "Really great cool project"
first_export_datetime = datetime.datetime.fromisoformat("2020-06-10T18:00:00")
omop_files.copy_to_exports(input_dir, project_name, first_export_datetime)
second_export_datetime = datetime.datetime.fromisoformat("2020-07-10T18:00:00")

# ACT
omop_files.copy_to_exports(input_dir, project_name, second_export_datetime)

# ASSERT
output_base = omop_files.export_dir / "really-great-cool-project"
specific_export_dir = (
output_base / "all_extracts" / "omop" / "2020-07-10t18-00-00" / "public"
)
assert specific_export_dir.exists()
# check that symlinked files are the most recent export
symlinked_dir = output_base / "latest" / "omop" / "public"
assert symlinked_dir.readlink() == specific_export_dir
previous_export_dir = (
output_base / "all_extracts" / "omop" / "2020-06-10t18-00-00" / "public"
)
assert symlinked_dir.readlink() != previous_export_dir
assert previous_export_dir.exists()


def test_project_with_no_public(omop_files, resources):
"""
Given an export directory which has no "public" subdirectory
When copy to exports is run
Then an assertion error will be raised
"""
input_dir = resources
project_name = "Really great cool project"
input_date = datetime.datetime.fromisoformat("2020-06-10T18:00:00")
with pytest.raises(FileNotFoundError) as error_info:
omop_files.copy_to_exports(input_dir, project_name, input_date)

assert error_info.match("Could not find public")
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ volumes:
orthanc-anon-data:
orthanc-raw-data:
postgres-data:
exports:

networks:
pixl-net:
Expand Down Expand Up @@ -247,6 +248,8 @@ services:
retries: 5
networks:
- pixl-net
volumes:
- ${PWD}/exports:/run/exports

pacs-api:
build:
Expand Down
29 changes: 29 additions & 0 deletions pixl_core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,32 @@ The client of choice for RabbitMQ at this point in time is [pika](https://pika.r
asynchronous way of transferring messages. The former is geared towards high data throughput whereas the latter is geared towards stability.
The asynchronous mode of transferring messages is a lot more complex as it is based on the
[asyncio event loop](https://docs.python.org/3/library/asyncio-eventloop.html).


### OMOP ES files

Public parquet exports from OMOP ES that should be transferred outside the hospital are copied to the `exports` directory at the repository base.

Within this directory each project has a directory, with all extracts run stored in `all_extracts` and the `latest` directory
contains a symlink to the most recent extract. This symlinking means that during the export stage it is clear which export should be sent.

```
└── project-1
├── all_extracts
│ └── omop
│ ├── 2020-06-10t18-00-00
│ │ └── public
│ └── 2020-07-10t18-00-00
│ └── public
└── latest
└── omop
└── public -> ../../../ all_extracts / omop / 2020-07-10t18-00-00 / public
└── project-2
├── all_extracts
│ └── omop
│ └── 2023-12-13t16-22-40
│ └── public
└── latest
└── omop
└── public -> ../../../ all_extracts / omop / 2023-12-13t16-22-40 / public
```
1 change: 1 addition & 0 deletions pixl_core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies = [
"fastapi==0.103.2",
"token-bucket==0.3.0",
"python-decouple==3.6",
"python-slugify==8.0.1",
"pika==1.3.1",
"aio_pika==8.2.4",
"environs==9.5.0",
Expand Down
86 changes: 86 additions & 0 deletions pixl_core/src/core/omop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright (c) University College London Hospitals NHS Foundation Trust
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Processing of OMOP parquet files."""
import datetime
import pathlib
import shutil

import slugify

root_from_install = pathlib.Path(__file__).parents[3]


class OmopExtract:
"""Processing Omop extracts on the filesystem."""

def __init__(self, root_dir: pathlib.Path = root_from_install) -> None:
"""Create instance of OMOPExtract helper."""
self.export_dir = root_dir / "exports"

@staticmethod
def _get_slugs(
project_name: str, extract_datetime: datetime.datetime
) -> tuple[str, str]:
"""Convert project name and datetime to slugs for writing to filesystem."""
project_slug = slugify.slugify(project_name)
extract_time_slug = slugify.slugify(extract_datetime.isoformat())
return project_slug, extract_time_slug

def copy_to_exports(
self,
omop_dir: pathlib.Path,
project_name: str,
extract_datetime: datetime.datetime,
) -> str:
"""
Copy public omop directory as the latest extract for the project.
Creates directories if they don't already exist.
:param omop_dir: parent path for omop export, with a "public" subdirectory
:param project_name: name of the project
:param extract_datetime: datetime that the OMOP ES extract was run
:raises FileNotFoundError: if there is no public subdirectory in `omop_dir`
:returns str: the project slug, so this can be registered for export to the DSH
"""
public_input = omop_dir / "public"
if not public_input.exists():
msg = f"Could not find public directory in input {omop_dir}"
raise FileNotFoundError(msg)

# Make directory for exports if they don't exist
project_slug, extract_time_slug = self._get_slugs(
project_name, extract_datetime
)
export_base = self.export_dir / project_slug
public_output = OmopExtract._mkdir(
export_base / "all_extracts" / "omop" / extract_time_slug / "public"
)

# Copy extract files, overwriting if it exists
shutil.copytree(public_input, public_output, dirs_exist_ok=True)
# Make the latest export dir if it doesn't exist
latest_parent_dir = self._mkdir(export_base / "latest" / "omop")
# Symlink this extract to the latest directory
latest_public = latest_parent_dir / "public"
if latest_public.exists():
latest_public.unlink()

latest_public.symlink_to(public_output, target_is_directory=True)
return project_slug

@staticmethod
def _mkdir(directory: pathlib.Path) -> pathlib.Path:
directory.mkdir(parents=True, exist_ok=True)
return directory

0 comments on commit a86fbe1

Please sign in to comment.