Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Radiology report exports #203

Merged
merged 46 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
9f90e29
WIP - refactor in preparation for Radiology Extract; plus rough sketch
jeremyestein Jan 3, 2024
a860920
WIP - tidy up
jeremyestein Jan 3, 2024
d6501f8
Pre-commit fixes
jeremyestein Jan 3, 2024
b54991a
Fix circular import problem when running test
jeremyestein Jan 3, 2024
89a75aa
Fix tests previously calling ParquetExport. Make the root dir a
jeremyestein Jan 3, 2024
0a1e80b
pre-commmit fixes
jeremyestein Jan 3, 2024
2af5f9a
Fix broken import line
jeremyestein Jan 3, 2024
99ff8ce
[Fix conflicts now so it's easier when we merge back in]
jeremyestein Jan 4, 2024
a2199fc
Oops think I needed a real merge commit to mark it as resolved [no
jeremyestein Jan 4, 2024
1e4f811
Merge branch 'main' into jeremy/issue-161-radiology
jeremyestein Jan 4, 2024
4d0d8e7
Fixup merge
jeremyestein Jan 4, 2024
0c2addb
WIP - try to define the PIXL DB query and conversion of data for parquet
jeremyestein Jan 4, 2024
1ed9a70
Fix some pre-commit issues
jeremyestein Jan 4, 2024
814bed7
Further sketching
jeremyestein Jan 4, 2024
fe76b68
Add pandas dep
jeremyestein Jan 4, 2024
fbb747d
Add image identifier to ehr_* tables and fill in the plain and hashed
jeremyestein Jan 5, 2024
b22ebe2
Add procedure_occurrence_id to the DB as well, since we need it in the
jeremyestein Jan 5, 2024
06c7147
Export to parquet and more pre-commit fixes
jeremyestein Jan 5, 2024
d479f5b
Always import datetime to prevent pydantic error
milanmlft Jan 8, 2024
14345b1
Procedure occurrence ID should be an `int`
milanmlft Jan 8, 2024
07fb60f
Add missing columns in test
milanmlft Jan 8, 2024
c32dd88
Symlink radiology export
milanmlft Jan 8, 2024
ee23522
Call the new code from the test. Need pyarrow in ehr-api
jeremyestein Jan 8, 2024
e9413cd
Add column names to parquet file
jeremyestein Jan 8, 2024
e52f023
Merge branch 'main' into jeremy/issue-161-radiology
jeremyestein Jan 9, 2024
98a422b
Raise exception rather than assert
jeremyestein Jan 9, 2024
acbff17
Using list instead of Iterable makes error go away
jeremyestein Jan 9, 2024
90ddf0a
Return full path so can check during testing
jeremyestein Jan 9, 2024
9895aac
Add `project_name` to PIXL DB table
milanmlft Jan 9, 2024
0eb3837
Test radiology exports
milanmlft Jan 9, 2024
9802dae
Fix mypy warning re return type of decorated function. We don't need the
jeremyestein Jan 10, 2024
adbee69
Match on project name *and* extract timestamp when defining what to
jeremyestein Jan 10, 2024
e09d4b7
Check parquet file doesn't reveal unhashed identifiers
jeremyestein Jan 10, 2024
dcb5118
method should not start with test_ if not a test
jeremyestein Jan 10, 2024
0c23188
Fix some comments
jeremyestein Jan 11, 2024
ead9848
Comment fixes
jeremyestein Jan 11, 2024
d8b1339
Put query->DataFrame conversion code in the same place
jeremyestein Jan 11, 2024
161b158
Clarify comment
jeremyestein Jan 11, 2024
06bb90c
Test that cogstack has been run
milanmlft Jan 11, 2024
728ac08
Rename file as it's no longer just about OMOP
jeremyestein Jan 11, 2024
6eabe11
Merge branch 'main' into jeremy/issue-161-radiology
jeremyestein Jan 11, 2024
41c2371
Remove superfluous hashing API endpoint; just use the generic one.
jeremyestein Jan 11, 2024
4f4d89a
Merge branch 'main' into jeremy/issue-161-radiology
milanmlft Jan 12, 2024
0cdfda8
Update `project_name` and `extract_datetime` to be non-optional
milanmlft Jan 12, 2024
8fc8f50
Update docstrings for `_omop_files()`
milanmlft Jan 12, 2024
ab6b200
Format docstring
milanmlft Jan 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions cli/src/pixl_cli/_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@
from pathlib import Path

import pandas as pd
from core.omop import OmopExtract
from core.omop import ParquetExport
from core.patient_queue.message import Message, deserialise

from pixl_cli._logging import logger
from pixl_cli._utils import string_is_non_empty

# instance of omop extract, can be overriden during testing
extract = OmopExtract()


def messages_from_state_file(filepath: Path) -> list[Message]:
"""
Expand Down Expand Up @@ -55,7 +52,8 @@ def copy_parquet_return_logfile_fields(parquet_path: Path) -> tuple[str, datetim
logs = json.load(log_file.open())
project_name = logs["settings"]["cdm_source_name"]
omop_es_timestamp = datetime.fromisoformat(logs["datetime"])
project_name_slug = extract.copy_to_exports(parquet_path, project_name, omop_es_timestamp)
extract = ParquetExport(project_name, omop_es_timestamp)
project_name_slug = extract.copy_to_exports(parquet_path)
return project_name_slug, omop_es_timestamp


Expand Down
11 changes: 4 additions & 7 deletions cli/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,19 @@

import pytest
from core.database import Base, Extract, Image
from core.omop import OmopExtract
from sqlalchemy import Engine, create_engine
from sqlalchemy.orm import Session, sessionmaker


@pytest.fixture(autouse=True)
def omop_files(tmp_path_factory: pytest.TempPathFactory, monkeypatch) -> OmopExtract:
def omop_files(tmp_path_factory: pytest.TempPathFactory, monkeypatch) -> None:
"""
Replace production extract instance with one writing to a tmpdir.

:returns OmopExtract: For direct use when the fixture is explicity called.
:returns ParquetExport: For direct use when the fixture is explicity called.
milanmlft marked this conversation as resolved.
Show resolved Hide resolved
"""
export_dir = tmp_path_factory.mktemp("repo_base")
tmpdir_extract = OmopExtract(export_dir)
monkeypatch.setattr("pixl_cli._io.extract", tmpdir_extract)
return tmpdir_extract
tmpdir_extract = tmp_path_factory.mktemp("repo_base")
monkeypatch.setattr("core.omop.ParquetExport.root_dir", tmpdir_extract)


@pytest.fixture()
Expand Down
21 changes: 14 additions & 7 deletions cli/tests/test_copy_omop.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
import datetime

import pytest
from core.omop import ParquetExport


def test_new_project_copies(omop_files, resources):
def test_new_project_copies(resources):
"""
Given a valid export directory and hasn't been exported before
When copy to exports is run
Expand All @@ -29,8 +30,9 @@ def test_new_project_copies(omop_files, resources):
input_dir = resources / "omop"
project_name = "Really great cool project"
input_date = datetime.datetime.fromisoformat("2020-06-10T18:00:00")
omop_files = ParquetExport(project_name, input_date)
# ACT
omop_files.copy_to_exports(input_dir, project_name, input_date)
omop_files.copy_to_exports(input_dir)
# ASSERT
output_base = omop_files.export_dir / "really-great-cool-project"

Expand All @@ -47,7 +49,7 @@ def test_new_project_copies(omop_files, resources):
assert symlinked_dir.is_symlink()


def test_second_export(omop_files, resources):
def test_second_export(resources):
"""
Given one export already exists for the project
When a second export with a different timestamp is run for the same project
Expand All @@ -58,11 +60,15 @@ def test_second_export(omop_files, resources):
input_dir = resources / "omop"
project_name = "Really great cool project"
first_export_datetime = datetime.datetime.fromisoformat("2020-06-10T18:00:00")
omop_files.copy_to_exports(input_dir, project_name, first_export_datetime)

omop_files = ParquetExport(project_name, first_export_datetime)
omop_files.copy_to_exports(input_dir)
second_export_datetime = datetime.datetime.fromisoformat("2020-07-10T18:00:00")

omop_files = ParquetExport(project_name, second_export_datetime)

# ACT
omop_files.copy_to_exports(input_dir, project_name, second_export_datetime)
omop_files.copy_to_exports(input_dir)

# ASSERT
output_base = omop_files.export_dir / "really-great-cool-project"
Expand All @@ -76,7 +82,7 @@ def test_second_export(omop_files, resources):
assert previous_export_dir.exists()


def test_project_with_no_public(omop_files, resources):
def test_project_with_no_public(resources):
"""
Given an export directory which has no "public" subdirectory
When copy to exports is run
Expand All @@ -85,7 +91,8 @@ def test_project_with_no_public(omop_files, resources):
input_dir = resources
project_name = "Really great cool project"
input_date = datetime.datetime.fromisoformat("2020-06-10T18:00:00")
omop_files = ParquetExport(project_name, input_date)
with pytest.raises(FileNotFoundError) as error_info:
omop_files.copy_to_exports(input_dir, project_name, input_date)
omop_files.copy_to_exports(input_dir)

assert error_info.match("Could not find public")
1 change: 1 addition & 0 deletions pixl_core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies = [
"jsonpickle==3.0.2",
"sqlalchemy==2.0.24",
"psycopg2-binary==2.9.9",
"pandas==1.5.1",
]

[project.optional-dependencies]
Expand Down
73 changes: 48 additions & 25 deletions pixl_core/src/core/omop.py
milanmlft marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,39 @@
import logging
import pathlib
import shutil
from collections.abc import Iterable
from typing import TYPE_CHECKING

import pandas as pd
import slugify

if TYPE_CHECKING:
import datetime


root_from_install = pathlib.Path(__file__).parents[3]

logger = logging.getLogger(__file__)


class OmopExtract:
"""Processing Omop extracts on the filesystem."""
class ParquetExport:
"""Exporting Omop and Emap extracts to Parquet files."""

root_dir: pathlib.Path = root_from_install

def __init__(self, root_dir: pathlib.Path = root_from_install) -> None:
"""Create instance of OMOPExtract helper."""
self.export_dir = root_dir / "exports"
def __init__(self, project_name: str, extract_datetime: datetime.datetime) -> None:
"""
:param project_name: name of the project
:param extract_datetime: datetime that the OMOP ES extract was run
:param root_dir: export directory root
"""
jeremyestein marked this conversation as resolved.
Show resolved Hide resolved
self.export_dir = ParquetExport.root_dir / "exports"
self.project_slug, self.extract_time_slug = self._get_slugs(project_name, extract_datetime)
self.export_base = self.export_dir / self.project_slug
current_extract = self.export_base / "all_extracts" / "omop" / self.extract_time_slug
self.public_output = current_extract / "public"
self.radiology_output = current_extract / "radiology"
self.latest_parent_dir = self.export_base / "latest" / "omop"

@staticmethod
def _get_slugs(project_name: str, extract_datetime: datetime.datetime) -> tuple[str, str]:
Expand All @@ -43,19 +58,11 @@ def _get_slugs(project_name: str, extract_datetime: datetime.datetime) -> tuple[
extract_time_slug = slugify.slugify(extract_datetime.isoformat())
return project_slug, extract_time_slug

def copy_to_exports(
self,
omop_dir: pathlib.Path,
project_name: str,
extract_datetime: datetime.datetime,
) -> str:
def copy_to_exports(self, omop_dir: pathlib.Path) -> str:
"""
Copy public omop directory as the latest extract for the project.

Creates directories if they don't already exist.
:param omop_dir: parent path for omop export, with a "public" subdirectory
:param project_name: name of the project
:param extract_datetime: datetime that the OMOP ES extract was run
:raises FileNotFoundError: if there is no public subdirectory in `omop_dir`
:returns str: the project slug, so this can be registered for export to the DSH
"""
Expand All @@ -65,24 +72,40 @@ def copy_to_exports(
raise FileNotFoundError(msg)

# Make directory for exports if they don't exist
project_slug, extract_time_slug = self._get_slugs(project_name, extract_datetime)
export_base = self.export_dir / project_slug
public_output = OmopExtract._mkdir(
export_base / "all_extracts" / "omop" / extract_time_slug / "public"
)
logger.info("Copying public parquet files from %s to %s", omop_dir, public_output)
ParquetExport._mkdir(self.public_output)
logger.info("Copying public parquet files from %s to %s", omop_dir, self.public_output)

# Copy extract files, overwriting if it exists
shutil.copytree(public_input, public_output, dirs_exist_ok=True)
shutil.copytree(public_input, self.public_output, dirs_exist_ok=True)
# Make the latest export dir if it doesn't exist
latest_parent_dir = self._mkdir(export_base / "latest" / "omop")

self._mkdir(self.latest_parent_dir)
# Symlink this extract to the latest directory
latest_public = latest_parent_dir / "public"
latest_public = self.latest_parent_dir / "public"
if latest_public.exists():
latest_public.unlink()

latest_public.symlink_to(public_output, target_is_directory=True)
return project_slug
latest_public.symlink_to(self.public_output, target_is_directory=True)
return self.project_slug

def export_radiology(self, anon_data: Iterable[tuple]) -> str:
"""Export radiology reports to parquet file"""
# columns required in parquet:
# - accession number
stefpiatek marked this conversation as resolved.
Show resolved Hide resolved
# - OMOP ES study id
stefpiatek marked this conversation as resolved.
Show resolved Hide resolved
# - the deIDed report text
# - link to the DICOM image
stefpiatek marked this conversation as resolved.
Show resolved Hide resolved
# - EHR imaging identifiers
stefpiatek marked this conversation as resolved.
Show resolved Hide resolved
self._mkdir(self.radiology_output)

# will need to convert header names
df = pd.DataFrame(anon_data)

# df.to_parquet()

# do symlinks...

return self.project_slug

@staticmethod
def _mkdir(directory: pathlib.Path) -> pathlib.Path:
Expand Down
12 changes: 12 additions & 0 deletions pixl_ehr/src/pixl_ehr/_databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from __future__ import annotations

import logging
from collections.abc import Iterable
from pathlib import Path
from typing import TYPE_CHECKING, Optional

Expand Down Expand Up @@ -105,3 +106,14 @@ def contains(self, data: PatientEHRData) -> bool:
query = "SELECT * FROM emap_data.ehr_raw WHERE mrn = %s and accession_number = %s"
self._cursor.execute(query=str(query), vars=[data.mrn, data.accession_number])
return self._cursor.fetchone() is not None

def get_radiology_reports(self) -> Iterable[tuple]:
"""
Get all radiology reports. Preferably filtered by study but we
don't have a column for that.
"""
# columns_and_types="mrn text, accession_number text, age integer, sex text, ethnicity text, height real, weight real, gcs integer, xray_report text"
query = "SELECT accession_number, xray_report FROM emap_data.ehr_anon"
self._cursor.execute(query=query)
all_rows = self._cursor.fetchall()
return all_rows
11 changes: 11 additions & 0 deletions pixl_ehr/src/pixl_ehr/_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@
import logging
import os
from abc import ABC, abstractmethod
from collections.abc import Iterable
from copy import deepcopy
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from typing import TYPE_CHECKING, Optional

import requests

if TYPE_CHECKING:
from core.patient_queue.message import Message
from core.omop import ParquetExport
from decouple import config

from pixl_ehr._databases import EMAPStar, PIXLDatabase
Expand Down Expand Up @@ -65,6 +70,12 @@ async def process_message(message: Message) -> None:
anon_data.persist(pixl_db, schema_name="emap_data", table_name="ehr_anon")


def export_radiology_reports(anon_data: Iterable[tuple]) -> None:
# might need to generate the study ID slug here?
pe = ParquetExport(project_name, extract_datetime)
pe.export_radiology(anon_data)


@dataclass
class PatientEHRData:
"""Dataclass for EHR unique to a patient and xray study"""
Expand Down
17 changes: 16 additions & 1 deletion pixl_ehr/src/pixl_ehr/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from fastapi.responses import JSONResponse

from ._databases import PIXLDatabase
from ._processing import process_message
from ._processing import export_radiology_reports, process_message

QUEUE_NAME = "ehr"

Expand Down Expand Up @@ -58,6 +58,21 @@ async def startup_event() -> None:
task.add_done_callback(background_tasks.discard)


@app.post(
"/export-radiology-as-parquet",
summary="Copy all radiology reports in the PIXL DB to a parquet file",
)
def export_radiology_as_parquet() -> None:
"""Batch export of all radiology reports in PIXL DB to a parquet file."""
# can we check the queue to make sure it's empty?
# or that the correct number of entries are in the PIXL DB?
stefpiatek marked this conversation as resolved.
Show resolved Hide resolved
# Get all reports - we don't store the extract IDs so we just hope there's
# only data from one extract here.
# Or will this command have to tell us which extract ID to use?
anon_data = PIXLDatabase().get_radiology_reports()
export_radiology_reports(anon_data)


@app.get(
"/az-copy-current",
summary="Copy the current state of the PIXL anon EHR schema to azure",
Expand Down
Loading