Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy public extracts upon export of data #201

Merged
merged 16 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions cli/src/pixl_cli/_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# Copyright (c) University College London Hospitals NHS Foundation Trust
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Reading and writing files from PIXL CLI."""
import json
from datetime import datetime
from pathlib import Path

import pandas as pd
from core.omop import OmopExtract
from core.patient_queue.message import Message, deserialise

from pixl_cli._logging import logger
from pixl_cli._utils import string_is_non_empty

# instance of omop extract, can be overriden during testing
extract = OmopExtract()


def messages_from_state_file(filepath: Path) -> list[Message]:
"""
Return messages from a state file path

:param filepath: Path for state file to be read
:return: A list of Message objects containing all the messages from the state file
"""
logger.info(f"Creating messages from {filepath}")
if not filepath.exists():
raise FileNotFoundError
if filepath.suffix != ".state":
msg = f"Invalid file suffix for {filepath}. Expected .state"
raise ValueError(msg)

return [
deserialise(line) for line in Path.open(filepath).readlines() if string_is_non_empty(line)
]


def copy_parquet_return_logfile_fields(parquet_path: Path) -> tuple[str, datetime]:
"""Copy public parquet file to extracts directory, and return fields from logfile"""
log_file = parquet_path / "extract_summary.json"

logs = json.load(log_file.open())
project_name = logs["settings"]["cdm_source_name"]
omop_es_timestamp = datetime.fromisoformat(logs["datetime"])
project_name_slug = extract.copy_to_exports(parquet_path, project_name, omop_es_timestamp)
return project_name_slug, omop_es_timestamp


def messages_from_parquet(
dir_path: Path, project_name: str, omop_es_timestamp: datetime
) -> list[Message]:
"""
Reads patient information from parquet files within directory structure
and transforms that into messages.

:param dir_path: Path for parquet directory containing private and public
:param project_name: Name of the project, should be a slug, so it can match the export directory
:param omop_es_timestamp: Datetime that OMOP ES ran the extract
files
"""
public_dir = dir_path / "public"
private_dir = dir_path / "private"

cohort_data = _check_and_parse_parquet(private_dir, public_dir)

expected_col_names = [
"PrimaryMrn",
"AccessionNumber",
"person_id",
"procedure_date",
"procedure_occurrence_id",
]
_raise_if_column_names_not_found(cohort_data, expected_col_names)

(
mrn_col_name,
acc_num_col_name,
_,
dt_col_name,
procedure_occurrence_id,
) = expected_col_names

messages = []

for _, row in cohort_data.iterrows():
message = Message(
mrn=row[mrn_col_name],
accession_number=row[acc_num_col_name],
study_date=row[dt_col_name],
procedure_occurrence_id=row[procedure_occurrence_id],
project_name=project_name,
omop_es_timestamp=omop_es_timestamp,
)
messages.append(message)

if len(messages) == 0:
msg = f"Failed to find any messages in {dir_path}"
raise ValueError(msg)

logger.info(f"Created {len(messages)} messages from {dir_path}")
return messages


def _check_and_parse_parquet(private_dir: Path, public_dir: Path) -> pd.DataFrame:
for d in [public_dir, private_dir]:
if not d.is_dir():
err_str = f"{d} must exist and be a directory"
raise NotADirectoryError(err_str)

# MRN in people.PrimaryMrn:
people = pd.read_parquet(private_dir / "PERSON_LINKS.parquet")
# accession number in accessions.AccessionNumber
accessions = pd.read_parquet(private_dir / "PROCEDURE_OCCURRENCE_LINKS.parquet")
# study_date is in procedure.procedure_date
procedure = pd.read_parquet(public_dir / "PROCEDURE_OCCURRENCE.parquet")
# joining data together
people_procedures = people.merge(procedure, on="person_id")
return people_procedures.merge(accessions, on="procedure_occurrence_id")


def _raise_if_column_names_not_found(
cohort_data: pd.DataFrame, expected_col_names: list[str]
) -> None:
logger.debug(
f"Checking merged parquet files. Expecting columns to include {expected_col_names}"
)
for col in expected_col_names:
if col not in list(cohort_data.columns):
msg = (
f"parquet files are expected to have at least {expected_col_names} as "
f"column names"
)
raise ValueError(msg)
132 changes: 14 additions & 118 deletions cli/src/pixl_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,21 @@
# limitations under the License.
"""PIXL command line interface functionality"""

import datetime
import json
import os
from operator import attrgetter
from pathlib import Path
from typing import Any, Optional

import click
import pandas as pd
import requests
import yaml
from core.patient_queue.message import Message, deserialise
from core.patient_queue.producer import PixlProducer
from core.patient_queue.subscriber import PixlBlockingConsumer

from ._io import copy_parquet_return_logfile_fields, messages_from_parquet, messages_from_state_file
from ._logging import logger, set_log_level
from ._utils import clear_file, remove_file_if_it_exists, string_is_non_empty
from ._utils import clear_file, remove_file_if_it_exists


def _load_config(filename: str = "pixl_config.yml") -> dict:
Expand Down Expand Up @@ -90,18 +88,20 @@ def populate(parquet_dir: Path, *, restart: bool, queues: str) -> None:
└── extract_summary.json
"""
logger.info(f"Populating queue(s) {queues} from {parquet_dir}")
project_name, omop_es_datetime = copy_parquet_return_logfile_fields(parquet_dir)
messages = messages_from_parquet(parquet_dir, project_name, omop_es_datetime)

for queue in queues.split(","):
with PixlProducer(queue_name=queue, **config["rabbitmq"]) as producer:
state_filepath = state_filepath_for_queue(queue)
if state_filepath.exists() and restart:
logger.info(f"Extracting messages from state: {state_filepath}")
inform_user_that_queue_will_be_populated_from(state_filepath)
messages = messages_from_state_file(state_filepath)
elif parquet_dir is not None:
messages = messages_from_parquet(parquet_dir)
state_filepath = state_filepath_for_queue(queue)
if state_filepath.exists() and restart:
logger.info(f"Extracting messages from state: {state_filepath}")
inform_user_that_queue_will_be_populated_from(state_filepath)
messages = messages_from_state_file(state_filepath)

remove_file_if_it_exists(state_filepath) # will be stale
producer.publish(sorted(messages, key=attrgetter("study_datetime")))
remove_file_if_it_exists(state_filepath) # will be stale

with PixlProducer(queue_name=queue, **config["rabbitmq"]) as producer:
producer.publish(sorted(messages, key=attrgetter("study_date")))


@cli.command()
Expand Down Expand Up @@ -285,110 +285,6 @@ def state_filepath_for_queue(queue_name: str) -> Path:
return Path(f"{queue_name.replace('/', '_')}.state")


def messages_from_state_file(filepath: Path) -> list[Message]:
"""
Return messages from a state file path

:param filepath: Path for state file to be read
:return: A list of Message objects containing all the messages from the state file
"""
logger.info(f"Creating messages from {filepath}")
if not filepath.exists():
raise FileNotFoundError
if filepath.suffix != ".state":
msg = f"Invalid file suffix for {filepath}. Expected .state"
raise ValueError(msg)

return [
deserialise(line) for line in Path.open(filepath).readlines() if string_is_non_empty(line)
]


def messages_from_parquet(dir_path: Path) -> list[Message]:
"""
Reads patient information from parquet files within directory structure
and transforms that into messages.
:param dir_path: Path for parquet directory containing private and public
files
"""
public_dir = dir_path / "public"
private_dir = dir_path / "private"
log_file = dir_path / "extract_summary.json"

for d in [public_dir, private_dir]:
if not d.is_dir():
err_str = f"{d} must exist and be a directory"
raise NotADirectoryError(err_str)

if not log_file.is_file():
err_str = f"{log_file} must exist and be a file"
raise FileNotFoundError(err_str)

# MRN in people.PrimaryMrn:
people = pd.read_parquet(private_dir / "PERSON_LINKS.parquet")
# accession number in accessions.AccesionNumber
accessions = pd.read_parquet(private_dir / "PROCEDURE_OCCURRENCE_LINKS.parquet")
# study_date is in procedure.procdure_date
procedure = pd.read_parquet(public_dir / "PROCEDURE_OCCURRENCE.parquet")
# joining data together
people_procedures = people.merge(procedure, on="person_id")
cohort_data = people_procedures.merge(accessions, on="procedure_occurrence_id")

expected_col_names = [
"PrimaryMrn",
"AccessionNumber",
"person_id",
"procedure_date",
"procedure_occurrence_id",
]
logger.debug(
f"Extracting messages from {dir_path}. Expecting columns to include "
f"{expected_col_names}"
)

for col in expected_col_names:
if col not in list(cohort_data.columns):
msg = (
f"parquet files are expected to have at least {expected_col_names} as "
f"column names"
)
raise ValueError(msg)

(
mrn_col_name,
acc_num_col_name,
_,
dt_col_name,
procedure_occurrence_id,
) = expected_col_names

# Get project name and OMOP ES timestamp from log file
logs = json.load(log_file.open())
project_name = logs["settings"]["cdm_source_name"]
omop_es_timestamp = datetime.datetime.fromisoformat(logs["datetime"])

messages = []

for _, row in cohort_data.iterrows():
# Create new dict to initialise message
message = Message(
mrn=row[mrn_col_name],
accession_number=row[acc_num_col_name],
study_datetime=row[dt_col_name],
procedure_occurrence_id=row[procedure_occurrence_id],
project_name=project_name,
omop_es_timestamp=omop_es_timestamp,
)
messages.append(message)

if len(messages) == 0:
msg = f"Failed to find any messages in {dir_path}"
raise ValueError(msg)

logger.info(f"Created {len(messages)} messages from {dir_path}")
return messages


def queue_is_up() -> Any:
"""Checks if the queue is up"""
with PixlProducer(queue_name="") as producer:
Expand Down
14 changes: 10 additions & 4 deletions cli/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@
from core.omop import OmopExtract


@pytest.fixture()
def omop_files(tmp_path_factory: pytest.TempPathFactory) -> OmopExtract:
"""Create an OmopExtract instance using a temporary directory"""
@pytest.fixture(autouse=True)
def omop_files(tmp_path_factory: pytest.TempPathFactory, monkeypatch) -> OmopExtract:
"""
Replace production extract instance with one writing to a tmpdir.

:returns OmopExtract: For direct use when the fixture is explicity called.
"""
export_dir = tmp_path_factory.mktemp("repo_base")
return OmopExtract(export_dir)
tmpdir_extract = OmopExtract(export_dir)
monkeypatch.setattr("pixl_cli._io.extract", tmpdir_extract)
return tmpdir_extract


@pytest.fixture()
Expand Down
Loading