Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Radiology report exports #203

Merged
merged 46 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
9f90e29
WIP - refactor in preparation for Radiology Extract; plus rough sketch
jeremyestein Jan 3, 2024
a860920
WIP - tidy up
jeremyestein Jan 3, 2024
d6501f8
Pre-commit fixes
jeremyestein Jan 3, 2024
b54991a
Fix circular import problem when running test
jeremyestein Jan 3, 2024
89a75aa
Fix tests previously calling ParquetExport. Make the root dir a
jeremyestein Jan 3, 2024
0a1e80b
pre-commmit fixes
jeremyestein Jan 3, 2024
2af5f9a
Fix broken import line
jeremyestein Jan 3, 2024
99ff8ce
[Fix conflicts now so it's easier when we merge back in]
jeremyestein Jan 4, 2024
a2199fc
Oops think I needed a real merge commit to mark it as resolved [no
jeremyestein Jan 4, 2024
1e4f811
Merge branch 'main' into jeremy/issue-161-radiology
jeremyestein Jan 4, 2024
4d0d8e7
Fixup merge
jeremyestein Jan 4, 2024
0c2addb
WIP - try to define the PIXL DB query and conversion of data for parquet
jeremyestein Jan 4, 2024
1ed9a70
Fix some pre-commit issues
jeremyestein Jan 4, 2024
814bed7
Further sketching
jeremyestein Jan 4, 2024
fe76b68
Add pandas dep
jeremyestein Jan 4, 2024
fbb747d
Add image identifier to ehr_* tables and fill in the plain and hashed
jeremyestein Jan 5, 2024
b22ebe2
Add procedure_occurrence_id to the DB as well, since we need it in the
jeremyestein Jan 5, 2024
06c7147
Export to parquet and more pre-commit fixes
jeremyestein Jan 5, 2024
d479f5b
Always import datetime to prevent pydantic error
milanmlft Jan 8, 2024
14345b1
Procedure occurrence ID should be an `int`
milanmlft Jan 8, 2024
07fb60f
Add missing columns in test
milanmlft Jan 8, 2024
c32dd88
Symlink radiology export
milanmlft Jan 8, 2024
ee23522
Call the new code from the test. Need pyarrow in ehr-api
jeremyestein Jan 8, 2024
e9413cd
Add column names to parquet file
jeremyestein Jan 8, 2024
e52f023
Merge branch 'main' into jeremy/issue-161-radiology
jeremyestein Jan 9, 2024
98a422b
Raise exception rather than assert
jeremyestein Jan 9, 2024
acbff17
Using list instead of Iterable makes error go away
jeremyestein Jan 9, 2024
90ddf0a
Return full path so can check during testing
jeremyestein Jan 9, 2024
9895aac
Add `project_name` to PIXL DB table
milanmlft Jan 9, 2024
0eb3837
Test radiology exports
milanmlft Jan 9, 2024
9802dae
Fix mypy warning re return type of decorated function. We don't need the
jeremyestein Jan 10, 2024
adbee69
Match on project name *and* extract timestamp when defining what to
jeremyestein Jan 10, 2024
e09d4b7
Check parquet file doesn't reveal unhashed identifiers
jeremyestein Jan 10, 2024
dcb5118
method should not start with test_ if not a test
jeremyestein Jan 10, 2024
0c23188
Fix some comments
jeremyestein Jan 11, 2024
ead9848
Comment fixes
jeremyestein Jan 11, 2024
d8b1339
Put query->DataFrame conversion code in the same place
jeremyestein Jan 11, 2024
161b158
Clarify comment
jeremyestein Jan 11, 2024
06bb90c
Test that cogstack has been run
milanmlft Jan 11, 2024
728ac08
Rename file as it's no longer just about OMOP
jeremyestein Jan 11, 2024
6eabe11
Merge branch 'main' into jeremy/issue-161-radiology
jeremyestein Jan 11, 2024
41c2371
Remove superfluous hashing API endpoint; just use the generic one.
jeremyestein Jan 11, 2024
4f4d89a
Merge branch 'main' into jeremy/issue-161-radiology
milanmlft Jan 12, 2024
0cdfda8
Update `project_name` and `extract_datetime` to be non-optional
milanmlft Jan 12, 2024
8fc8f50
Update docstrings for `_omop_files()`
milanmlft Jan 12, 2024
ab6b200
Format docstring
milanmlft Jan 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions cli/src/pixl_cli/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import TYPE_CHECKING

import pandas as pd
from core.omop import OmopExtract
from core.exports import ParquetExport
from core.patient_queue.message import Message, deserialise

from pixl_cli._logging import logger
Expand All @@ -28,9 +28,6 @@
if TYPE_CHECKING:
from pathlib import Path

# instance of omop extract, can be overriden during testing
extract = OmopExtract()


def messages_from_state_file(filepath: Path) -> list[Message]:
"""
Expand All @@ -56,7 +53,8 @@ def copy_parquet_return_logfile_fields(parquet_path: Path) -> tuple[str, datetim
logs = json.load(log_file.open())
project_name = logs["settings"]["cdm_source_name"]
omop_es_timestamp = datetime.fromisoformat(logs["datetime"])
project_name_slug = extract.copy_to_exports(parquet_path, project_name, omop_es_timestamp)
extract = ParquetExport(project_name, omop_es_timestamp)
project_name_slug = extract.copy_to_exports(parquet_path)
return project_name_slug, omop_es_timestamp


Expand Down
15 changes: 4 additions & 11 deletions cli/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,15 @@

import pytest
from core.database import Base, Extract, Image
from core.omop import OmopExtract
from sqlalchemy import Engine, create_engine
from sqlalchemy.orm import Session, sessionmaker


@pytest.fixture(autouse=True)
def omop_files(tmp_path_factory: pytest.TempPathFactory, monkeypatch) -> OmopExtract:
"""
Replace production extract instance with one writing to a tmpdir.

:returns OmopExtract: For direct use when the fixture is explicity called.
"""
export_dir = tmp_path_factory.mktemp("repo_base")
tmpdir_extract = OmopExtract(export_dir)
monkeypatch.setattr("pixl_cli._io.extract", tmpdir_extract)
return tmpdir_extract
def _omop_files(tmp_path_factory: pytest.TempPathFactory, monkeypatch) -> None:
"""Replace production extract instance with one writing to a tmpdir."""
tmpdir_extract = tmp_path_factory.mktemp("repo_base")
monkeypatch.setattr("core.exports.ParquetExport.root_dir", tmpdir_extract)


@pytest.fixture()
Expand Down
21 changes: 14 additions & 7 deletions cli/tests/test_copy_omop.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
import datetime

import pytest
from core.exports import ParquetExport


def test_new_project_copies(omop_files, resources):
def test_new_project_copies(resources):
"""
Given a valid export directory and hasn't been exported before
When copy to exports is run
Expand All @@ -29,8 +30,9 @@ def test_new_project_copies(omop_files, resources):
input_dir = resources / "omop"
project_name = "Really great cool project"
input_date = datetime.datetime.fromisoformat("2020-06-10T18:00:00")
omop_files = ParquetExport(project_name, input_date)
# ACT
omop_files.copy_to_exports(input_dir, project_name, input_date)
omop_files.copy_to_exports(input_dir)
# ASSERT
output_base = omop_files.export_dir / "really-great-cool-project"

Expand All @@ -47,7 +49,7 @@ def test_new_project_copies(omop_files, resources):
assert symlinked_dir.is_symlink()


def test_second_export(omop_files, resources):
def test_second_export(resources):
"""
Given one export already exists for the project
When a second export with a different timestamp is run for the same project
Expand All @@ -58,11 +60,15 @@ def test_second_export(omop_files, resources):
input_dir = resources / "omop"
project_name = "Really great cool project"
first_export_datetime = datetime.datetime.fromisoformat("2020-06-10T18:00:00")
omop_files.copy_to_exports(input_dir, project_name, first_export_datetime)

omop_files = ParquetExport(project_name, first_export_datetime)
omop_files.copy_to_exports(input_dir)
second_export_datetime = datetime.datetime.fromisoformat("2020-07-10T18:00:00")

omop_files = ParquetExport(project_name, second_export_datetime)

# ACT
omop_files.copy_to_exports(input_dir, project_name, second_export_datetime)
omop_files.copy_to_exports(input_dir)

# ASSERT
output_base = omop_files.export_dir / "really-great-cool-project"
Expand All @@ -76,7 +82,7 @@ def test_second_export(omop_files, resources):
assert previous_export_dir.exists()


def test_project_with_no_public(omop_files, resources):
def test_project_with_no_public(resources):
"""
Given an export directory which has no "public" subdirectory
When copy to exports is run
Expand All @@ -85,7 +91,8 @@ def test_project_with_no_public(omop_files, resources):
input_dir = resources
project_name = "Really great cool project"
input_date = datetime.datetime.fromisoformat("2020-06-10T18:00:00")
omop_files = ParquetExport(project_name, input_date)
with pytest.raises(FileNotFoundError) as error_info:
omop_files.copy_to_exports(input_dir, project_name, input_date)
omop_files.copy_to_exports(input_dir)

assert error_info.match("Could not find public")
1 change: 1 addition & 0 deletions pixl_core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies = [
"jsonpickle==3.0.2",
"sqlalchemy==2.0.24",
"psycopg2-binary==2.9.9",
"pandas==1.5.1",
]

[project.optional-dependencies]
Expand Down
71 changes: 46 additions & 25 deletions pixl_core/src/core/omop.py → pixl_core/src/core/exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,31 @@
if TYPE_CHECKING:
import datetime

import pandas as pd


root_from_install = pathlib.Path(__file__).parents[3]

logger = logging.getLogger(__file__)


class OmopExtract:
"""Processing Omop extracts on the filesystem."""
class ParquetExport:
"""Exporting Omop and Emap extracts to Parquet files."""

def __init__(self, root_dir: pathlib.Path = root_from_install) -> None:
"""Create instance of OMOPExtract helper."""
self.export_dir = root_dir / "exports"
root_dir: pathlib.Path = root_from_install

def __init__(self, project_name: str, extract_datetime: datetime.datetime) -> None:
"""
:param project_name: name of the project
:param extract_datetime: datetime that the OMOP ES extract was run
"""
self.export_dir = ParquetExport.root_dir / "exports"
self.project_slug, self.extract_time_slug = self._get_slugs(project_name, extract_datetime)
self.export_base = self.export_dir / self.project_slug
current_extract = self.export_base / "all_extracts" / "omop" / self.extract_time_slug
self.public_output = current_extract / "public"
self.radiology_output = current_extract / "radiology"
self.latest_parent_dir = self.export_base / "latest" / "omop"

@staticmethod
def _get_slugs(project_name: str, extract_datetime: datetime.datetime) -> tuple[str, str]:
Expand All @@ -43,19 +57,11 @@ def _get_slugs(project_name: str, extract_datetime: datetime.datetime) -> tuple[
extract_time_slug = slugify.slugify(extract_datetime.isoformat())
return project_slug, extract_time_slug

def copy_to_exports(
self,
omop_dir: pathlib.Path,
project_name: str,
extract_datetime: datetime.datetime,
) -> str:
def copy_to_exports(self, omop_dir: pathlib.Path) -> str:
"""
Copy public omop directory as the latest extract for the project.

Creates directories if they don't already exist.
:param omop_dir: parent path for omop export, with a "public" subdirectory
:param project_name: name of the project
:param extract_datetime: datetime that the OMOP ES extract was run
:raises FileNotFoundError: if there is no public subdirectory in `omop_dir`
:returns str: the project slug, so this can be registered for export to the DSH
"""
Expand All @@ -65,24 +71,39 @@ def copy_to_exports(
raise FileNotFoundError(msg)

# Make directory for exports if they don't exist
project_slug, extract_time_slug = self._get_slugs(project_name, extract_datetime)
export_base = self.export_dir / project_slug
public_output = OmopExtract._mkdir(
export_base / "all_extracts" / "omop" / extract_time_slug / "public"
)
logger.info("Copying public parquet files from %s to %s", omop_dir, public_output)
ParquetExport._mkdir(self.public_output)
logger.info("Copying public parquet files from %s to %s", omop_dir, self.public_output)

# Copy extract files, overwriting if it exists
shutil.copytree(public_input, public_output, dirs_exist_ok=True)
shutil.copytree(public_input, self.public_output, dirs_exist_ok=True)
# Make the latest export dir if it doesn't exist
latest_parent_dir = self._mkdir(export_base / "latest" / "omop")

self._mkdir(self.latest_parent_dir)
# Symlink this extract to the latest directory
latest_public = latest_parent_dir / "public"
latest_public = self.latest_parent_dir / "public"
if latest_public.exists():
latest_public.unlink()

latest_public.symlink_to(public_output, target_is_directory=True)
return project_slug
latest_public.symlink_to(self.public_output, target_is_directory=True)
return self.project_slug

def export_radiology(self, export_df: pd.DataFrame) -> pathlib.Path:
"""Export radiology reports to parquet file"""
self._mkdir(self.radiology_output)
parquet_file = self.radiology_output / "radiology.parquet"

export_df.to_parquet(parquet_file)

# Make the "latest" export dir if it doesn't exist
self._mkdir(self.latest_parent_dir)
# Symlink this report to the latest directory
latest_parquet_file = self.latest_parent_dir / "radiology.parquet"
if latest_parquet_file.exists():
latest_parquet_file.unlink()

latest_parquet_file.symlink_to(parquet_file, target_is_directory=False)

return self.radiology_output

@staticmethod
def _mkdir(directory: pathlib.Path) -> pathlib.Path:
Expand Down
1 change: 1 addition & 0 deletions pixl_ehr/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ dependencies = [
"psycopg2==2.9.5",
"azure-identity==1.12.0",
"azure-storage-blob==12.14.1",
"pyarrow==14.0.1",
jeremyestein marked this conversation as resolved.
Show resolved Hide resolved
"PyYAML==6.0",
]

Expand Down
14 changes: 14 additions & 0 deletions pixl_ehr/src/pixl_ehr/_databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
from pathlib import Path
from typing import TYPE_CHECKING, Optional

import pandas as pd
import psycopg2 as pypg
from decouple import config

logger = logging.getLogger("uvicorn")

if TYPE_CHECKING:
from datetime import datetime

from pixl_ehr._processing import PatientEHRData
from pixl_ehr._queries import SQLQuery

Expand Down Expand Up @@ -105,3 +108,14 @@ def contains(self, data: PatientEHRData) -> bool:
query = "SELECT * FROM emap_data.ehr_raw WHERE mrn = %s and accession_number = %s"
self._cursor.execute(query=str(query), vars=[data.mrn, data.accession_number])
return self._cursor.fetchone() is not None

def get_radiology_reports(self, project_name: str, extract_datetime: datetime) -> pd.DataFrame:
"""Get all radiology reports for a given study."""
query = (
"SELECT image_identifier, procedure_occurrence_id, xray_report FROM emap_data.ehr_anon "
"WHERE project_name = %s AND extract_datetime = %s"
)
self._cursor.execute(query=str(query), vars=[project_name, extract_datetime])
anon_data = self._cursor.fetchall()
parquet_header_names = ["image_identifier", "procedure_occurrence_id", "image_report"]
return pd.DataFrame(anon_data, columns=parquet_header_names)
20 changes: 20 additions & 0 deletions pixl_ehr/src/pixl_ehr/_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
from typing import TYPE_CHECKING, Optional

import requests

if TYPE_CHECKING:
from core.patient_queue.message import Message
from decouple import config

from pixl_ehr._databases import EMAPStar, PIXLDatabase
Expand Down Expand Up @@ -71,6 +74,10 @@ class PatientEHRData:

mrn: str
accession_number: str
image_identifier: str
procedure_occurrence_id: int
project_name: str
extract_datetime: datetime
acquisition_datetime: Optional[datetime]

age: Optional[int] = None
Expand All @@ -91,6 +98,10 @@ def from_message(cls, message: Message) -> PatientEHRData:
self = PatientEHRData(
mrn=message.mrn,
accession_number=message.accession_number,
image_identifier=message.mrn + message.accession_number,
procedure_occurrence_id=message.procedure_occurrence_id,
project_name=message.project_name,
extract_datetime=message.omop_es_timestamp,
acquisition_datetime=message.study_date,
)

Expand Down Expand Up @@ -119,13 +130,17 @@ def persist(self, database: PIXLDatabase, schema_name: str, table_name: str) ->
col_names = [
"mrn",
"accession_number",
"image_identifier",
"procedure_occurrence_id",
"age",
"sex",
"ethnicity",
"height",
"weight",
"gcs",
"xray_report",
"project_name",
"extract_datetime",
]

cols = ",".join(col_names)
Expand All @@ -136,13 +151,17 @@ def persist(self, database: PIXLDatabase, schema_name: str, table_name: str) ->
[
self.mrn,
self.accession_number,
self.image_identifier,
self.procedure_occurrence_id,
self.age,
self.sex,
self.ethnicity,
self.height,
self.weight,
self.glasgow_coma_scale,
self.report_text,
self.project_name,
self.extract_datetime,
],
)
logger.debug("Persist successful!")
Expand All @@ -156,6 +175,7 @@ def anonymise(self) -> PatientEHRData:
self.accession_number = pixl_hash(
self.accession_number, endpoint_path="hash-accession-number"
)
self.image_identifier = pixl_hash(self.image_identifier, endpoint_path="hash")
self.acquisition_datetime = None

return self
Expand Down
19 changes: 19 additions & 0 deletions pixl_ehr/src/pixl_ehr/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
import asyncio
import importlib.metadata
import logging
from datetime import (
datetime, # noqa: TCH003, always import datetime otherwise pydantic throws error
)
from pathlib import Path

from azure.identity import EnvironmentCredential
from azure.storage.blob import BlobServiceClient
from core.exports import ParquetExport
from core.patient_queue import PixlConsumer
from core.router import router, state
from decouple import config
Expand Down Expand Up @@ -58,6 +62,21 @@ async def startup_event() -> None:
task.add_done_callback(background_tasks.discard)


@app.post(
"/export-radiology-as-parquet",
summary="Copy all matching radiology reports in the PIXL DB to a parquet file",
)
def export_radiology_as_parquet(project_name: str, extract_datetime: datetime) -> None:
"""
Batch export of all matching radiology reports in PIXL DB to a parquet file.
NOTE: we can't check that all reports in the queue have been processed, so
we are relying on the user waiting until processing has finished before running this.
"""
anon_data = PIXLDatabase().get_radiology_reports(project_name, extract_datetime)
pe = ParquetExport(project_name, extract_datetime)
pe.export_radiology(anon_data)


@app.get(
"/az-copy-current",
summary="Copy the current state of the PIXL anon EHR schema to azure",
Expand Down
2 changes: 1 addition & 1 deletion pixl_ehr/tests/dummy-services/cogstack.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ async def redact(request: fastapi.Request) -> PlainTextResponse:
"""Mocked Cogstack redact endpoint, used for integration testing."""
await asyncio.sleep(2)
body = await request.body()
return PlainTextResponse(body.decode("utf-8"))
return PlainTextResponse(body.decode("utf-8") + "**DE-IDENTIFIED**")
Loading