diff --git a/cli/README.md b/cli/README.md index 6745c73a5..9351d4e32 100644 --- a/cli/README.md +++ b/cli/README.md @@ -1,11 +1,10 @@ # PIXL Driver + Command line interface -The PIXL CLI driver provides functionality to populate a queue with messages -containing information required to run electronic health queries against the -EMAP star database and the PACS image system. Once a set of queues are -populated the consumers can be started, updated and the system extractions -stopped cleanly. - +The PIXL CLI driver provides functionality to populate a queue with messages +containing information required to run electronic health queries against the +EMAP star database and the PACS image system. Once a set of queues are +populated the consumers can be started, updated and the system extractions +stopped cleanly. ## Installation @@ -14,43 +13,56 @@ pip install -e ../pixl_core/ . ``` ## Test + ```bash ./tests/run-tests.sh ``` - ## Usage > **Note** > Services must be started prior to using the CLI See the commands and subcommands with + ```bash pixl --help ``` Populate queue for PACS and EHR extraction + ```bash -pixl populate .csv +pixl populate ``` -where the csv file contains MRN, accession numbers and timestamps in the format: -| VAL_ID | ACCESSION_NUMBER | STUDY_INSTANCE_UID | STUDY_DATE | ... | -|--------|------------------|--------------------|------------------|-----| -| X | Y | Z | 29/02/2010 05:12 | | +where `parquet_dir` contains at least the following files: +```sh +parquet_dir +├── extract_summary.json +├── private +│ ├── PERSON_LINKS.parquet +│ └── PROCEDURE_OCCURRENCE_LINKS.parquet +└── public + └── PROCEDURE_OCCURRENCE.parquet +``` Start the PACS extraction + ```bash pixl start --queues pacs ``` + and equivalently the EHR extraction + ```bash pixl start --queues ehr ``` + Use `pixl start --help` for information. Stop PACS and EHR database extraction + ```bash pixl stop ``` diff --git a/cli/pyproject.toml b/cli/pyproject.toml index 9f5764967..fe40e5de7 100644 --- a/cli/pyproject.toml +++ b/cli/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "click==8.1.3", "coloredlogs==15.0.1", "pandas==1.5.1", + "pyarrow==14.0.1", "PyYAML==6.0" ] diff --git a/cli/src/pixl_cli/main.py b/cli/src/pixl_cli/main.py index 30297211e..75763b8b2 100644 --- a/cli/src/pixl_cli/main.py +++ b/cli/src/pixl_cli/main.py @@ -16,6 +16,7 @@ import datetime import json import os +from operator import attrgetter from pathlib import Path from typing import Any, Optional @@ -23,9 +24,9 @@ import pandas as pd import requests import yaml +from core.patient_queue.message import Message, deserialise from core.patient_queue.producer import PixlProducer from core.patient_queue.subscriber import PixlBlockingConsumer -from core.patient_queue.utils import deserialise, serialise from ._logging import logger, set_log_level from ._utils import clear_file, remove_file_if_it_exists, string_is_non_empty @@ -56,35 +57,51 @@ def cli(*, debug: bool) -> None: @cli.command() -@click.argument("csv_filename", type=click.Path(exists=True)) @click.option( "--queues", default="ehr,pacs", show_default=True, - help="Comma seperated list of queues to populate with messages generated from the " ".csv file", + help="Comma seperated list of queues to populate with messages generated from the " + "input file(s)", ) @click.option( "--restart/--no-restart", show_default=True, default=True, - help="Restart from a saved state. Otherwise will use the given .csv file", + help="Restart from a saved state. Otherwise will use the given input file(s)", ) -def populate(csv_filename: str, queues: str, *, restart: bool) -> None: - """Populate a (set of) queue(s) from a csv file""" - logger.info(f"Populating queue(s) {queues} from {csv_filename}") - +@click.argument( + "parquet-dir", required=True, type=click.Path(path_type=Path, exists=True, file_okay=False) +) +def populate(parquet_dir: Path, *, restart: bool, queues: str) -> None: + """ + Populate a (set of) queue(s) from a parquet file directory + + PARQUET-DIR: Directory containing the public and private parquet input files and an + extract_summary.json log file. + It's expected that the directory structure will be: + + PARQUET-DIR + ├── private + │ ├── PERSON_LINKS.parquet + │ └── PROCEDURE_OCCURRENCE_LINKS.parquet + ├── public + │ └── PROCEDURE_OCCURRENCE.parquet + └── extract_summary.json + """ + logger.info(f"Populating queue(s) {queues} from {parquet_dir}") for queue in queues.split(","): with PixlProducer(queue_name=queue, **config["rabbitmq"]) as producer: state_filepath = state_filepath_for_queue(queue) if state_filepath.exists() and restart: logger.info(f"Extracting messages from state: {state_filepath}") inform_user_that_queue_will_be_populated_from(state_filepath) - messages = Messages.from_state_file(state_filepath) - else: - messages = messages_from_csv(Path(csv_filename)) + messages = messages_from_state_file(state_filepath) + elif parquet_dir is not None: + messages = messages_from_parquet(parquet_dir) remove_file_if_it_exists(state_filepath) # will be stale - producer.publish(sorted(messages, key=study_date_from_serialised)) + producer.publish(sorted(messages, key=attrgetter("study_datetime"))) @cli.command() @@ -268,81 +285,107 @@ def state_filepath_for_queue(queue_name: str) -> Path: return Path(f"{queue_name.replace('/', '_')}.state") -class Messages(list): +def messages_from_state_file(filepath: Path) -> list[Message]: """ - Class to represent messages + Return messages from a state file path - Methods - ------- - from_state_file(cls, filepath) - Return messages from a state file path + :param filepath: Path for state file to be read + :return: A list of Message objects containing all the messages from the state file """ + logger.info(f"Creating messages from {filepath}") + if not filepath.exists(): + raise FileNotFoundError + if filepath.suffix != ".state": + msg = f"Invalid file suffix for {filepath}. Expected .state" + raise ValueError(msg) - @classmethod - def from_state_file(cls, filepath: Path) -> "Messages": - """ - Return messages from a state file path - - :param filepath: Path for state file to be read - :return: A Messages object containing all the messages from the state file - """ - logger.info(f"Creating messages from {filepath}") - if not filepath.exists(): - raise FileNotFoundError - if filepath.suffix != ".state": - msg = f"Invalid file suffix for {filepath}. Expected .state" - raise ValueError(msg) - - return cls( - [ - line.encode("utf-8") - for line in Path.open(filepath).readlines() - if string_is_non_empty(line) - ] - ) + return [ + deserialise(line) for line in Path.open(filepath).readlines() if string_is_non_empty(line) + ] -def messages_from_csv(filepath: Path) -> Messages: +def messages_from_parquet(dir_path: Path) -> list[Message]: """ - Reads patient information from CSV and transforms that into messages. - :param filepath: Path for CSV file to be read + Reads patient information from parquet files within directory structure + and transforms that into messages. + :param dir_path: Path for parquet directory containing private and public + files """ + public_dir = dir_path / "public" + private_dir = dir_path / "private" + log_file = dir_path / "extract_summary.json" + + for d in [public_dir, private_dir]: + if not d.is_dir(): + err_str = f"{d} must exist and be a directory" + raise NotADirectoryError(err_str) + + if not log_file.is_file(): + err_str = f"{log_file} must exist and be a file" + raise FileNotFoundError(err_str) + + # MRN in people.PrimaryMrn: + people = pd.read_parquet(private_dir / "PERSON_LINKS.parquet") + # accession number in accessions.AccesionNumber + accessions = pd.read_parquet(private_dir / "PROCEDURE_OCCURRENCE_LINKS.parquet") + # study_date is in procedure.procdure_date + procedure = pd.read_parquet(public_dir / "PROCEDURE_OCCURRENCE.parquet") + # joining data together + people_procedures = people.merge(procedure, on="person_id") + cohort_data = people_procedures.merge(accessions, on="procedure_occurrence_id") + expected_col_names = [ - "VAL_ID", - "ACCESSION_NUMBER", - "STUDY_INSTANCE_UID", - "STUDY_DATE", + "PrimaryMrn", + "AccessionNumber", + "person_id", + "procedure_date", + "procedure_occurrence_id", ] logger.debug( - f"Extracting messages from {filepath}. Expecting columns to include " + f"Extracting messages from {dir_path}. Expecting columns to include " f"{expected_col_names}" ) - # First line is column names - messages_df = pd.read_csv(filepath, header=0, dtype=str) - messages = Messages() - - if list(messages_df.columns)[:4] != expected_col_names: - msg = f"csv file expected to have at least {expected_col_names} as " f"column names" - raise ValueError(msg) - - mrn_col_name, acc_num_col_name, _, dt_col_name = expected_col_names - for _, row in messages_df.iterrows(): - messages.append( - serialise( - mrn=row[mrn_col_name], - accession_number=row[acc_num_col_name], - study_datetime=datetime.datetime.strptime( - row[dt_col_name], "%d/%m/%Y %H:%M" - ).replace(tzinfo=datetime.timezone.utc), + for col in expected_col_names: + if col not in list(cohort_data.columns): + msg = ( + f"parquet files are expected to have at least {expected_col_names} as " + f"column names" ) + raise ValueError(msg) + + ( + mrn_col_name, + acc_num_col_name, + _, + dt_col_name, + procedure_occurrence_id, + ) = expected_col_names + + # Get project name and OMOP ES timestamp from log file + logs = json.load(log_file.open()) + project_name = logs["settings"]["cdm_source_name"] + omop_es_timestamp = datetime.datetime.fromisoformat(logs["datetime"]) + + messages = [] + + for _, row in cohort_data.iterrows(): + # Create new dict to initialise message + message = Message( + mrn=row[mrn_col_name], + accession_number=row[acc_num_col_name], + study_datetime=row[dt_col_name], + procedure_occurrence_id=row[procedure_occurrence_id], + project_name=project_name, + omop_es_timestamp=omop_es_timestamp, ) + messages.append(message) if len(messages) == 0: - msg = f"Failed to find any messages in {filepath}" + msg = f"Failed to find any messages in {dir_path}" raise ValueError(msg) - logger.debug(f"Created {len(messages)} messages from {filepath}") + logger.info(f"Created {len(messages)} messages from {dir_path}") return messages @@ -405,12 +448,3 @@ def api_config_for_queue(queue_name: str) -> APIConfig: raise ValueError(msg) return APIConfig(config[config_key]) - - -def study_date_from_serialised(message: bytes) -> datetime.datetime: - """Get the study date from a serialised message as a datetime""" - result = deserialise(message)["study_datetime"] - if not isinstance(result, datetime.datetime): - msg = "Expected study date to be a datetime. Got %s" - raise TypeError(msg, type(result)) - return result diff --git a/cli/tests/resources/omop/extract_summary.json b/cli/tests/resources/omop/extract_summary.json new file mode 100644 index 000000000..ab66372fc --- /dev/null +++ b/cli/tests/resources/omop/extract_summary.json @@ -0,0 +1,94 @@ +{ + "gitsha":"56e0eba8d098523c99f3c899979096d2c5ed4c5f", + "filesummaries":[ +"CARE_SITE.parquet: 4084 bytes", +"CARE_SITE_BAD.parquet: 3305 bytes", +"CARE_SITE_LINKS.parquet: 1201 bytes", +"CDM_SOURCE.parquet: 5823 bytes", +"CDM_SOURCE_BAD.parquet: 7852 bytes", +"CONDITION_OCCURRENCE.parquet: 6770 bytes", +"CONDITION_OCCURRENCE_BAD.parquet: 5004 bytes", +"CONDITION_OCCURRENCE_LINKS.parquet: 612 bytes", +"DEVICE_EXPOSURE.parquet: 4524 bytes", +"DEVICE_EXPOSURE_BAD.parquet: 4524 bytes", +"DEVICE_EXPOSURE_LINKS.parquet: 682 bytes", +"DRUG_EXPOSURE.parquet: 5782 bytes", +"DRUG_EXPOSURE_BAD.parquet: 3907 bytes", +"DRUG_EXPOSURE_LINKS.parquet: 597 bytes", +"FACT_RELATIONSHIP.parquet: 2167 bytes", +"FACT_RELATIONSHIP_BAD.parquet: 1357 bytes", +"LOCATION.parquet: 1865 bytes", +"LOCATION_BAD.parquet: 1343 bytes", +"LOCATION_LINKS.parquet: 904 bytes", +"MEASUREMENT.parquet: 6742 bytes", +"MEASUREMENT_BAD.parquet: 3982 bytes", +"MEASUREMENT_LINKS.parquet: 2309 bytes", +"OBSERVATION.parquet: 5614 bytes", +"OBSERVATION_BAD.parquet: 3618 bytes", +"OBSERVATION_LINKS.parquet: 1263 bytes", +"OBSERVATION_PERIOD.parquet: 2183 bytes", +"OBSERVATION_PERIOD_BAD.parquet: 1488 bytes", +"OBSERVATION_PERIOD_LINKS.parquet: 606 bytes", +"PERSON.parquet: 5420 bytes", +"PERSON_BAD.parquet: 3614 bytes", +"PERSON_LINKS.parquet: 1953 bytes", +"PROCEDURE_OCCURRENCE.parquet: 5230 bytes", +"PROCEDURE_OCCURRENCE_BAD.parquet: 3665 bytes", +"PROCEDURE_OCCURRENCE_LINKS.parquet: 1311 bytes", +"SPECIMEN.parquet: 4873 bytes", +"SPECIMEN_BAD.parquet: 3326 bytes", +"SPECIMEN_LINKS.parquet: 928 bytes", +"VISIT_DETAIL.parquet: 3228 bytes", +"VISIT_DETAIL_BAD.parquet: 3228 bytes", +"VISIT_DETAIL_LINKS.parquet: 435 bytes", +"VISIT_OCCURRENCE.parquet: 5259 bytes", +"VISIT_OCCURRENCE_BAD.parquet: 3429 bytes", +"VISIT_OCCURRENCE_LINKS.parquet: 1349 bytes" + ], + "datetime":"2023-12-07 14:08:58", + "user":"John Watts", + "settings":{ + "site":"UCLH", + "cdm_source_name":"Test Extract - UCLH OMOP CDM", + "cdm_source_abbreviation":"Test UCLH OMOP", + "project_logic":"mock_project_settings/project_logic.R", + "min_date": 20100101, + "max_date": 20241231, + "enabled_sources":"epic", + "output_format":"parquet", + "OMOP_version": 60, + "cohort":{ + "file":"settings/mock_project_settings/mock_cohort.csv", + "exclude_NDOO": true, + "exclude_confidential": true, + "min_age_at_encounter_start": 16, + "max_age_at_encounter_start": 80 + }, + "keep_source_vals": false, + "person":{ + "include_nhs_number": false, + "include_mrn": false, + "keep_day_of_birth": false, + "keep_month_of_birth": true, + "include_gp_as_primary_care_site": false + }, + "observation_period_strategy":"visit_span", + "local_timezone":"Europe/London", + "output_timezone":"GMT", + "condition_occurrence":{ + "include_sexual_health": false, + "allow_icd_as_std": true + }, + "measurements":{ + "include_file":null, + "include_measurement_concept_ids":null, + "non_generic_numeric_labs": 3040104 + }, + "location":{ + "keep_only_zip": true, + "replace_postcode_with_LSOA": true + }, + "mapping_effective_date": 19698, + "name":"mock_project_settings" + } +} \ No newline at end of file diff --git a/cli/tests/resources/omop/private/PERSON_LINKS.parquet b/cli/tests/resources/omop/private/PERSON_LINKS.parquet new file mode 100644 index 000000000..bef07a1a2 Binary files /dev/null and b/cli/tests/resources/omop/private/PERSON_LINKS.parquet differ diff --git a/cli/tests/resources/omop/private/PROCEDURE_OCCURRENCE_LINKS.parquet b/cli/tests/resources/omop/private/PROCEDURE_OCCURRENCE_LINKS.parquet new file mode 100644 index 000000000..97a953bd4 Binary files /dev/null and b/cli/tests/resources/omop/private/PROCEDURE_OCCURRENCE_LINKS.parquet differ diff --git a/cli/tests/resources/omop/public/CARE_SITE.parquet b/cli/tests/resources/omop/public/CARE_SITE.parquet deleted file mode 100644 index 18da482a3..000000000 Binary files a/cli/tests/resources/omop/public/CARE_SITE.parquet and /dev/null differ diff --git a/cli/tests/resources/omop/public/CDM_SOURCE.parquet b/cli/tests/resources/omop/public/CDM_SOURCE.parquet deleted file mode 100644 index c8f6c1702..000000000 Binary files a/cli/tests/resources/omop/public/CDM_SOURCE.parquet and /dev/null differ diff --git a/cli/tests/resources/omop/public/CONDITION_OCCURRENCE.parquet b/cli/tests/resources/omop/public/CONDITION_OCCURRENCE.parquet deleted file mode 100644 index c41ba6b4e..000000000 Binary files a/cli/tests/resources/omop/public/CONDITION_OCCURRENCE.parquet and /dev/null differ diff --git a/cli/tests/resources/omop/public/DEVICE_EXPOSURE.parquet b/cli/tests/resources/omop/public/DEVICE_EXPOSURE.parquet deleted file mode 100644 index 744754fe9..000000000 Binary files a/cli/tests/resources/omop/public/DEVICE_EXPOSURE.parquet and /dev/null differ diff --git a/cli/tests/resources/omop/public/DRUG_EXPOSURE.parquet b/cli/tests/resources/omop/public/DRUG_EXPOSURE.parquet deleted file mode 100644 index 4ed30b556..000000000 Binary files a/cli/tests/resources/omop/public/DRUG_EXPOSURE.parquet and /dev/null differ diff --git a/cli/tests/resources/omop/public/FACT_RELATIONSHIP.parquet b/cli/tests/resources/omop/public/FACT_RELATIONSHIP.parquet deleted file mode 100644 index 93b22f7b7..000000000 Binary files a/cli/tests/resources/omop/public/FACT_RELATIONSHIP.parquet and /dev/null differ diff --git a/cli/tests/resources/omop/public/LOCATION.parquet b/cli/tests/resources/omop/public/LOCATION.parquet deleted file mode 100644 index 49f8f3064..000000000 Binary files a/cli/tests/resources/omop/public/LOCATION.parquet and /dev/null differ diff --git a/cli/tests/resources/omop/public/MEASUREMENT.parquet b/cli/tests/resources/omop/public/MEASUREMENT.parquet deleted file mode 100644 index 8196a80eb..000000000 Binary files a/cli/tests/resources/omop/public/MEASUREMENT.parquet and /dev/null differ diff --git a/cli/tests/resources/omop/public/OBSERVATION.parquet b/cli/tests/resources/omop/public/OBSERVATION.parquet deleted file mode 100644 index 6ae40ad48..000000000 Binary files a/cli/tests/resources/omop/public/OBSERVATION.parquet and /dev/null differ diff --git a/cli/tests/resources/omop/public/OBSERVATION_PERIOD.parquet b/cli/tests/resources/omop/public/OBSERVATION_PERIOD.parquet deleted file mode 100644 index f0eddab84..000000000 Binary files a/cli/tests/resources/omop/public/OBSERVATION_PERIOD.parquet and /dev/null differ diff --git a/cli/tests/resources/omop/public/PERSON.parquet b/cli/tests/resources/omop/public/PERSON.parquet deleted file mode 100644 index ae693885e..000000000 Binary files a/cli/tests/resources/omop/public/PERSON.parquet and /dev/null differ diff --git a/cli/tests/resources/omop/public/SPECIMEN.parquet b/cli/tests/resources/omop/public/SPECIMEN.parquet deleted file mode 100644 index 9a7dec739..000000000 Binary files a/cli/tests/resources/omop/public/SPECIMEN.parquet and /dev/null differ diff --git a/cli/tests/resources/omop/public/VISIT_DETAIL.parquet b/cli/tests/resources/omop/public/VISIT_DETAIL.parquet deleted file mode 100644 index da20bd662..000000000 Binary files a/cli/tests/resources/omop/public/VISIT_DETAIL.parquet and /dev/null differ diff --git a/cli/tests/resources/omop/public/VISIT_OCCURRENCE.parquet b/cli/tests/resources/omop/public/VISIT_OCCURRENCE.parquet deleted file mode 100644 index 5b65fbf0a..000000000 Binary files a/cli/tests/resources/omop/public/VISIT_OCCURRENCE.parquet and /dev/null differ diff --git a/cli/tests/run-tests.sh b/cli/tests/run-tests.sh index 5072d2c49..a036af870 100755 --- a/cli/tests/run-tests.sh +++ b/cli/tests/run-tests.sh @@ -24,7 +24,7 @@ THIS_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) PACKAGE_DIR="${THIS_DIR%/*}" cd "$PACKAGE_DIR" || exit -pip install "../pixl_core/[test]" ".[test]" +pip install -e "../pixl_core/[test]" ".[test]" cd tests/ docker compose up -d diff --git a/cli/tests/test_messages_from_parquet.py b/cli/tests/test_messages_from_parquet.py new file mode 100644 index 000000000..c092cc489 --- /dev/null +++ b/cli/tests/test_messages_from_parquet.py @@ -0,0 +1,67 @@ +# Copyright (c) University College London Hospitals NHS Foundation Trust +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for reading cohorts from parquet files.""" + +import datetime +from pathlib import Path + +from core.patient_queue.message import Message +from pixl_cli.main import messages_from_parquet + + +def test_messages_from_parquet(resources: Path) -> None: + """ + Test that the messages are as expected, given the test parquet files. + The test data doesn't have any "difficult" cases in it, eg. people without procedures. + """ + omop_parquet_dir = resources / "omop" + messages = messages_from_parquet(omop_parquet_dir) + assert all(isinstance(msg, Message) for msg in messages) + + expected_messages = [ + Message( + mrn="12345678", + accession_number="12345678", + study_datetime=datetime.date.fromisoformat("2021-07-01"), + procedure_occurrence_id=1, + project_name="Test Extract - UCLH OMOP CDM", + omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), + ), + Message( + mrn="12345678", + accession_number="ABC1234567", + study_datetime=datetime.date.fromisoformat("2021-07-01"), + procedure_occurrence_id=2, + project_name="Test Extract - UCLH OMOP CDM", + omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), + ), + Message( + mrn="987654321", + accession_number="ABC1234560", + study_datetime=datetime.date.fromisoformat("2020-05-01"), + procedure_occurrence_id=3, + project_name="Test Extract - UCLH OMOP CDM", + omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), + ), + Message( + mrn="5020765", + accession_number="MIG0234560", + study_datetime=datetime.date.fromisoformat("2015-05-01"), + procedure_occurrence_id=4, + project_name="Test Extract - UCLH OMOP CDM", + omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), + ), + ] + + assert messages == expected_messages diff --git a/cli/tests/test_queue_start_and_stop.py b/cli/tests/test_queue_start_and_stop_parquet.py similarity index 75% rename from cli/tests/test_queue_start_and_stop.py rename to cli/tests/test_queue_start_and_stop_parquet.py index 5e1103c82..4affebb57 100644 --- a/cli/tests/test_queue_start_and_stop.py +++ b/cli/tests/test_queue_start_and_stop_parquet.py @@ -19,20 +19,22 @@ from pixl_cli.main import populate, queue_is_up, stop -def test_populate_queue(queue_name: str = "test_populate") -> None: +def test_populate_queue_parquet(resources: Path, queue_name: str = "test_populate") -> None: """Checks that patient queue can be populated without error.""" + omop_parquet_dir = str(resources / "omop") runner = CliRunner() - result = runner.invoke(populate, args=["test.csv", "--queues", queue_name]) + result = runner.invoke(populate, args=[omop_parquet_dir, "--queues", queue_name]) assert result.exit_code == 0 -def test_down_queue(queue_name: str = "test_down") -> None: +def test_down_queue_parquet(resources: Path, queue_name: str = "test_down") -> None: """ Checks that after the queue has been sent a stop signal, the queue has been emptied. """ + omop_parquet_dir = str(resources / "omop") runner = CliRunner() - _ = runner.invoke(populate, args=["test.csv", "--queues", queue_name]) + _ = runner.invoke(populate, args=[omop_parquet_dir, "--queues", queue_name]) _ = runner.invoke(stop, args=["--queues", queue_name]) state_path = Path(f"{queue_name}.state") diff --git a/pixl_core/pyproject.toml b/pixl_core/pyproject.toml index 8fdd72727..056307e6b 100644 --- a/pixl_core/pyproject.toml +++ b/pixl_core/pyproject.toml @@ -18,7 +18,8 @@ dependencies = [ "pika==1.3.1", "aio_pika==8.2.4", "environs==9.5.0", - "requests==2.31.0" + "requests==2.31.0", + "jsonpickle==3.0.2" ] [project.optional-dependencies] diff --git a/pixl_core/src/core/patient_queue/message.py b/pixl_core/src/core/patient_queue/message.py new file mode 100644 index 000000000..595be68c0 --- /dev/null +++ b/pixl_core/src/core/patient_queue/message.py @@ -0,0 +1,74 @@ +# Copyright (c) 2022 University College London Hospitals NHS Foundation Trust +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Classes to represent messages in the patient queue.""" + +import logging +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +from jsonpickle import decode, encode + +logger = logging.getLogger(__name__) + + +@dataclass +class Message: + """Class to represent a message containing the relevant information for a study.""" + + mrn: str + accession_number: str + study_datetime: datetime + procedure_occurrence_id: str + project_name: str + omop_es_timestamp: datetime + + def serialise(self, *, deserialisable: bool = True) -> bytes: + """ + Serialise the message into a JSON string and convert to bytes. + + :param deserialisable: If True, the serialised message will be deserialisable, by setting + the unpicklable flag to False in jsonpickle.encode(), meaning that the original Message + object can be recovered by `deserialise()`. If False, calling `deserialise()` on the + serialised message will return a dictionary. + """ + msg = ( + "Serialising message with\n" + " * patient id: %s\n" + " * accession number: %s\n" + " * timestamp: %s\n" + " * procedure_occurrence_id: %s\n", + " * project_name: %s\n * omop_es_timestamp: %s", + self.mrn, + self.accession_number, + self.study_datetime, + self.procedure_occurrence_id, + self.project_name, + self.omop_es_timestamp, + ) + logger.debug(msg) + + return str.encode(encode(self, unpicklable=deserialisable)) + + +def deserialise(serialised_msg: bytes) -> Any: + """ + Deserialise a message from a bytes-encoded JSON string. + If the message was serialised with `deserialisable=True`, the original Message object will be + returned. Otherwise, a dictionary will be returned. + + :param serialised_msg: The serialised message. + """ + return decode(serialised_msg) # noqa: S301, since we control the input, so no security risks diff --git a/pixl_core/src/core/patient_queue/producer.py b/pixl_core/src/core/patient_queue/producer.py index 38f5ec5d1..b840a4584 100644 --- a/pixl_core/src/core/patient_queue/producer.py +++ b/pixl_core/src/core/patient_queue/producer.py @@ -16,6 +16,8 @@ import logging from time import sleep +from core.patient_queue.message import Message + from ._base import PixlBlockingInterface LOGGER = logging.getLogger(__name__) @@ -24,7 +26,7 @@ class PixlProducer(PixlBlockingInterface): """Generic publisher for RabbitMQ""" - def publish(self, messages: list[bytes]) -> None: + def publish(self, messages: list[Message]) -> None: """ Sends a list of serialised messages to a queue. :param messages: list of messages to be sent to queue @@ -32,11 +34,15 @@ def publish(self, messages: list[bytes]) -> None: LOGGER.debug("Publishing %i messages to queue: %s", len(messages), self.queue_name) if len(messages) > 0: for msg in messages: + LOGGER.debug("Serialising message") + serialised_msg = msg.serialise() LOGGER.debug("Preparing to publish") - self._channel.basic_publish(exchange="", routing_key=self.queue_name, body=msg) + self._channel.basic_publish( + exchange="", routing_key=self.queue_name, body=serialised_msg + ) # RabbitMQ can miss-order messages if there is not a sufficient delay sleep(0.1) - LOGGER.debug("Message %s published to queue %s", msg.decode(), self.queue_name) + LOGGER.debug("Message %s published to queue %s", msg, self.queue_name) else: LOGGER.debug("List of messages is empty so nothing will be published to queue.") diff --git a/pixl_core/src/core/patient_queue/subscriber.py b/pixl_core/src/core/patient_queue/subscriber.py index cf5361297..bc868ee6b 100644 --- a/pixl_core/src/core/patient_queue/subscriber.py +++ b/pixl_core/src/core/patient_queue/subscriber.py @@ -21,6 +21,7 @@ import aio_pika +from core.patient_queue.message import Message, deserialise from core.token_buffer.tokens import TokenBucket from ._base import PixlBlockingInterface, PixlQueueInterface @@ -52,7 +53,7 @@ async def __aenter__(self) -> "PixlConsumer": self._queue = await self._channel.declare_queue(self.queue_name) return self - async def run(self, callback: Callable[[bytes], Awaitable[None]]) -> None: + async def run(self, callback: Callable[[Message], Awaitable[None]]) -> None: """ Creates loop that waits for messages from producer and processes them as they appear. @@ -73,7 +74,7 @@ async def run(self, callback: Callable[[bytes], Awaitable[None]]) -> None: try: await asyncio.sleep(0.01) # Avoid very fast callbacks - await callback(message.body) + await callback(deserialise(message.body)) except Exception: LOGGER.exception( "Failed to process %s" "Not re-queuing message", diff --git a/pixl_core/src/core/patient_queue/utils.py b/pixl_core/src/core/patient_queue/utils.py deleted file mode 100644 index 1ecb3d86c..000000000 --- a/pixl_core/src/core/patient_queue/utils.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright (c) 2022 University College London Hospitals NHS Foundation Trust -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Utility functions""" - -import json -import logging -from datetime import datetime - -logger = logging.getLogger(__name__) - - -def deserialise(message_body: bytes) -> dict: - """Returns the de-serialised message in JSON format.""" - logger.debug("De-serialising: %s", message_body.decode()) - data = dict(json.loads(message_body.decode())) - if "study_datetime" in data: - data["study_datetime"] = datetime.fromisoformat(data["study_datetime"]) - return data - - -def serialise(mrn: str, accession_number: str, study_datetime: datetime) -> bytes: - """ - Returns serialised message from patient id, accession number and date of study. - :param mrn: patient identifier - :param accession_number: accession number - :param study_datetime: date and time of the study - :returns: JSON formatted message - """ - logger.debug( - "Serialising message with patient id %s, " "accession number: %s and timestamp %s", - mrn, - accession_number, - study_datetime, - ) - return json.dumps( - { - "mrn": mrn, - "accession_number": accession_number, - "study_datetime": study_datetime.isoformat(), - } - ).encode("utf-8") diff --git a/pixl_core/tests/patient_queue/test_message.py b/pixl_core/tests/patient_queue/test_message.py new file mode 100644 index 000000000..8471cfdc4 --- /dev/null +++ b/pixl_core/tests/patient_queue/test_message.py @@ -0,0 +1,47 @@ +# Copyright (c) 2022 University College London Hospitals NHS Foundation Trust +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import datetime + +from core.patient_queue.message import Message, deserialise + +msg = Message( + mrn="111", + accession_number="123", + study_datetime=datetime.datetime.strptime("Nov 22 2022 1:33PM", "%b %d %Y %I:%M%p").replace( + tzinfo=datetime.timezone.utc + ), + procedure_occurrence_id="234", + project_name="test project", + omop_es_timestamp=datetime.datetime.strptime("Dec 7 2023 2:08PM", "%b %d %Y %I:%M%p").replace( + tzinfo=datetime.timezone.utc + ), +) + + +def test_serialise() -> None: + """Checks that messages can be correctly serialised""" + msg_body = msg.serialise(deserialisable=False) + assert ( + msg_body == b'{"mrn": "111", "accession_number": "123", ' + b'"study_datetime": "2022-11-22T13:33:00+00:00", ' + b'"procedure_occurrence_id": "234", ' + b'"project_name": "test project", ' + b'"omop_es_timestamp": "2023-12-07T14:08:00+00:00"}' + ) + + +def test_deserialise() -> None: + """Checks if deserialised messages are the same as the original""" + serialised_msg = msg.serialise() + assert deserialise(serialised_msg) == msg diff --git a/pixl_core/tests/patient_queue/test_producer.py b/pixl_core/tests/patient_queue/test_producer.py index 2aa73691e..1aca4cda4 100644 --- a/pixl_core/tests/patient_queue/test_producer.py +++ b/pixl_core/tests/patient_queue/test_producer.py @@ -12,9 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. import pytest +from core.patient_queue.message import Message from core.patient_queue.producer import PixlProducer TEST_QUEUE = "test_publish" +TEST_MESSAGE = Message( + mrn="111", + accession_number="123", + study_datetime="2022-11-22T13:33:00+00:00", + procedure_occurrence_id="234", + project_name="test project", + omop_es_timestamp="2023-12-07T14:08:00+00:00", +) @pytest.mark.pika() @@ -32,7 +41,7 @@ def test_publish() -> None: """ with PixlProducer(queue_name=TEST_QUEUE) as pp: pp.clear_queue() - pp.publish(messages=[b"test"]) + pp.publish(messages=[TEST_MESSAGE]) with PixlProducer(queue_name=TEST_QUEUE) as pp: assert pp.message_count == 1 diff --git a/pixl_core/tests/patient_queue/test_subscriber.py b/pixl_core/tests/patient_queue/test_subscriber.py index e2a8c6b1d..32910dedb 100644 --- a/pixl_core/tests/patient_queue/test_subscriber.py +++ b/pixl_core/tests/patient_queue/test_subscriber.py @@ -18,12 +18,21 @@ from unittest import TestCase import pytest +from core.patient_queue.message import Message from core.patient_queue.producer import PixlProducer from core.patient_queue.subscriber import PixlBlockingConsumer, PixlConsumer from core.token_buffer.tokens import TokenBucket TEST_QUEUE = "test_consume" -MESSAGE_BODY = b"test" +TEST_MESSAGE = Message( + mrn="111", + accession_number="123", + study_datetime="2022-11-22T13:33:00+00:00", + procedure_occurrence_id="234", + project_name="test project", + omop_es_timestamp="2023-12-07T14:08:00+00:00", +) + counter = 0 @@ -52,17 +61,17 @@ async def test_create(self) -> None: """Checks consume is working.""" global counter # noqa: PLW0602 with PixlProducer(queue_name=TEST_QUEUE) as pp: - pp.publish(messages=[MESSAGE_BODY]) + pp.publish(messages=[TEST_MESSAGE]) async with PixlConsumer(queue_name=TEST_QUEUE, token_bucket=TokenBucket()) as pc: - async def consume(msg: bytes) -> None: + async def consume(msg: Message) -> None: """ Increases counter when message is downloaded. :param msg: body of the message, though not needed :returns: the increased counter, though here only once """ - if str(msg) != "": + if str(msg.serialise()) != "": global counter counter += 1 @@ -78,7 +87,7 @@ def test_consume_all() -> None: graceful shutdown. """ with PixlProducer(queue_name=TEST_QUEUE) as pp: - pp.publish(messages=[MESSAGE_BODY, MESSAGE_BODY]) + pp.publish(messages=[TEST_MESSAGE, TEST_MESSAGE]) with PixlBlockingConsumer(queue_name=TEST_QUEUE) as bc: counter_bc = bc.consume_all(timeout_in_seconds=2, file_path=Path("test_producer.csv")) diff --git a/pixl_core/tests/patient_queue/test_utils.py b/pixl_core/tests/patient_queue/test_utils.py deleted file mode 100644 index 7e2d0c19f..000000000 --- a/pixl_core/tests/patient_queue/test_utils.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright (c) 2022 University College London Hospitals NHS Foundation Trust -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import datetime -import json - -from core.patient_queue.utils import deserialise, serialise - - -def test_serialise() -> None: - """Checks that messages can be correctly serialised""" - msg_body = serialise( - mrn="111", - accession_number="123", - study_datetime=datetime.datetime.strptime("Nov 22 2022 1:33PM", "%b %d %Y %I:%M%p").replace( - tzinfo=datetime.timezone.utc - ), - ) - assert ( - msg_body.decode() == '{"mrn": "111", "accession_number": "123", ' - '"study_datetime": "2022-11-22T13:33:00+00:00"}' - ) - - -def test_simple_deserialise() -> None: - """Checks a simple JSON deserialise works""" - assert deserialise((json.dumps({"key": "value"})).encode("utf-8"))["key"] == "value" - - -def test_deserialise_datetime() -> None: - """Checks that datetimes can be correctly serialised""" - timestamp = datetime.datetime.fromordinal(100012) - data = deserialise(serialise(mrn="", accession_number="", study_datetime=timestamp)) - assert data["study_datetime"] == timestamp diff --git a/pixl_ehr/src/pixl_ehr/_processing.py b/pixl_ehr/src/pixl_ehr/_processing.py index b7d47bb75..89a9380f7 100644 --- a/pixl_ehr/src/pixl_ehr/_processing.py +++ b/pixl_ehr/src/pixl_ehr/_processing.py @@ -21,7 +21,7 @@ from typing import Optional import requests -from core.patient_queue.utils import deserialise +from core.patient_queue.message import Message from decouple import config from pixl_ehr._databases import EMAPStar, PIXLDatabase @@ -35,14 +35,14 @@ _this_dir = Path(Path(__file__).parent) -async def process_message(message_body: bytes) -> None: - logger.info("Processing: %s", message_body.decode()) +async def process_message(message: Message) -> None: + logger.info("Processing: %s", message) - raw_data = PatientEHRData.from_message(message_body) + raw_data = PatientEHRData.from_message(message) pixl_db = PIXLDatabase() if pixl_db.contains(raw_data): - logger.info("Messaged has already been processed") + logger.info("Message has already been processed") return emap_star_db = EMAPStar() @@ -79,16 +79,15 @@ class PatientEHRData: report_text: Optional[str] = None @classmethod - def from_message(cls, message_body: bytes) -> "PatientEHRData": + def from_message(cls, message: Message) -> "PatientEHRData": """ Create a minimal set of patient EHR data required to start queries from a queue message """ - message_data = deserialise(message_body) self = PatientEHRData( - mrn=message_data["mrn"], - accession_number=message_data["accession_number"], - acquisition_datetime=message_data["study_datetime"], + mrn=message.mrn, + accession_number=message.accession_number, + acquisition_datetime=message.study_datetime, ) logger.debug("Created %s from message data", self) diff --git a/pixl_ehr/tests/test_processing.py b/pixl_ehr/tests/test_processing.py index dff27aff1..d0b933884 100644 --- a/pixl_ehr/tests/test_processing.py +++ b/pixl_ehr/tests/test_processing.py @@ -21,7 +21,7 @@ import datetime import pytest -from core.patient_queue.utils import serialise +from core.patient_queue.message import Message from decouple import config from pixl_ehr._databases import PIXLDatabase, WriteableDatabase from pixl_ehr._processing import process_message @@ -36,6 +36,9 @@ observation_datetime = datetime.datetime.fromisoformat( "1234-01-01" ) # within hours of imaging study +procedure_occurrence_id = "123" +project_name = "test project" +omop_es_timestamp = datetime.datetime.fromisoformat("1234-01-01 00:00:00") date_of_birth = "09/08/0007" sex = "testsexvalue" ethnicity = "testethnicity" @@ -52,12 +55,15 @@ weight_vot_id, height_vot_id, gcs_vot_id = 2222222, 3333333, 4444444 ls_id, lo_id, lr_id, ltd_id = 5555555, 6666666, 7777777, 8888888 -message_body = serialise( +message = Message( mrn=mrn, accession_number=accession_number, study_datetime=datetime.datetime.strptime(study_datetime_str, "%d/%m/%Y %H:%M").replace( tzinfo=datetime.timezone.utc ), + procedure_occurrence_id=procedure_occurrence_id, + project_name=project_name, + omop_es_timestamp=omop_es_timestamp, ) @@ -157,7 +163,7 @@ def insert_data_into_emap_star_schema() -> None: @pytest.mark.asyncio() async def test_message_processing() -> None: insert_data_into_emap_star_schema() - await process_message(message_body) + await process_message(message) pixl_db = QueryablePIXLDB() row = pixl_db.execute_query_string("select * from emap_data.ehr_raw where mrn = %s", [mrn]) diff --git a/pixl_pacs/src/pixl_pacs/_processing.py b/pixl_pacs/src/pixl_pacs/_processing.py index c644d9d6c..af0733444 100644 --- a/pixl_pacs/src/pixl_pacs/_processing.py +++ b/pixl_pacs/src/pixl_pacs/_processing.py @@ -15,10 +15,9 @@ import os from asyncio import sleep from dataclasses import dataclass -from datetime import datetime from time import time -from core.patient_queue.utils import deserialise +from core.patient_queue.message import Message from decouple import config from pixl_pacs._orthanc import Orthanc, PIXLRawOrthanc @@ -27,10 +26,10 @@ logger.setLevel(os.environ.get("LOG_LEVEL", "WARNING")) -async def process_message(message_body: bytes) -> None: - logger.info("Processing: %s", message_body.decode()) +async def process_message(message: Message) -> None: + logger.info("Processing: %s", message) - study = ImagingStudy.from_message(message_body) + study = ImagingStudy.from_message(message) orthanc_raw = PIXLRawOrthanc() if study.exists_in(orthanc_raw): @@ -49,7 +48,7 @@ async def process_message(message_body: bytes) -> None: while job_state != "Success": if (time() - start_time) > config("PIXL_DICOM_TRANSFER_TIMEOUT", cast=float): msg = ( - f"Failed to transfer {message_body.decode()} within " + f"Failed to transfer {message.decode()} within " f"{config('PIXL_DICOM_TRANSFER_TIMEOUT')} seconds" ) raise TimeoutError(msg) @@ -64,19 +63,20 @@ async def process_message(message_body: bytes) -> None: class ImagingStudy: """Dataclass for EHR unique to a patient and xray study""" - mrn: str - accession_number: str - study_datetime: datetime + message: Message @classmethod - def from_message(cls, message_body: bytes) -> "ImagingStudy": - return ImagingStudy(**deserialise(message_body)) + def from_message(cls, message: Message) -> "ImagingStudy": + return ImagingStudy(message=message) @property def orthanc_query_dict(self) -> dict: return { "Level": "Study", - "Query": {"PatientID": self.mrn, "AccessionNumber": self.accession_number}, + "Query": { + "PatientID": self.message.mrn, + "AccessionNumber": self.message.accession_number, + }, } def exists_in(self, node: Orthanc) -> bool: diff --git a/pixl_pacs/tests/test_processing.py b/pixl_pacs/tests/test_processing.py index 72180368c..82084e44b 100644 --- a/pixl_pacs/tests/test_processing.py +++ b/pixl_pacs/tests/test_processing.py @@ -19,7 +19,7 @@ import os import pytest -from core.patient_queue.utils import serialise +from core.patient_queue.message import Message from decouple import config from pixl_pacs._orthanc import Orthanc, PIXLRawOrthanc from pixl_pacs._processing import ImagingStudy, process_message @@ -30,12 +30,15 @@ ACCESSION_NUMBER = "abc" PATIENT_ID = "a_patient" -message_body = serialise( +message = Message( mrn=PATIENT_ID, accession_number=ACCESSION_NUMBER, study_datetime=datetime.datetime.strptime("01/01/1234 01:23:45", "%d/%m/%Y %H:%M:%S").replace( tzinfo=datetime.timezone.utc ), + procedure_occurrence_id="234", + project_name="test project", + omop_es_timestamp=datetime.datetime.fromisoformat("1234-01-01 00:00:00"), ) @@ -70,15 +73,15 @@ def add_image_to_fake_vna(image_filename: str = "test.dcm") -> None: @pytest.mark.asyncio() async def test_image_processing() -> None: add_image_to_fake_vna() - study = ImagingStudy.from_message(message_body) + study = ImagingStudy.from_message(message) orthanc_raw = PIXLRawOrthanc() assert not study.exists_in(orthanc_raw) - await process_message(message_body=message_body) + await process_message(message) assert study.exists_in(orthanc_raw) # TODO: check time last updated after processing again # noqa: FIX002 # is not incremented # https://github.com/UCLH-Foundry/PIXL/issues/156 - await process_message(message_body=message_body) + await process_message(message) assert study.exists_in(orthanc_raw) diff --git a/test/data/resources/omop/extract_summary.json b/test/data/resources/omop/extract_summary.json new file mode 100644 index 000000000..ab66372fc --- /dev/null +++ b/test/data/resources/omop/extract_summary.json @@ -0,0 +1,94 @@ +{ + "gitsha":"56e0eba8d098523c99f3c899979096d2c5ed4c5f", + "filesummaries":[ +"CARE_SITE.parquet: 4084 bytes", +"CARE_SITE_BAD.parquet: 3305 bytes", +"CARE_SITE_LINKS.parquet: 1201 bytes", +"CDM_SOURCE.parquet: 5823 bytes", +"CDM_SOURCE_BAD.parquet: 7852 bytes", +"CONDITION_OCCURRENCE.parquet: 6770 bytes", +"CONDITION_OCCURRENCE_BAD.parquet: 5004 bytes", +"CONDITION_OCCURRENCE_LINKS.parquet: 612 bytes", +"DEVICE_EXPOSURE.parquet: 4524 bytes", +"DEVICE_EXPOSURE_BAD.parquet: 4524 bytes", +"DEVICE_EXPOSURE_LINKS.parquet: 682 bytes", +"DRUG_EXPOSURE.parquet: 5782 bytes", +"DRUG_EXPOSURE_BAD.parquet: 3907 bytes", +"DRUG_EXPOSURE_LINKS.parquet: 597 bytes", +"FACT_RELATIONSHIP.parquet: 2167 bytes", +"FACT_RELATIONSHIP_BAD.parquet: 1357 bytes", +"LOCATION.parquet: 1865 bytes", +"LOCATION_BAD.parquet: 1343 bytes", +"LOCATION_LINKS.parquet: 904 bytes", +"MEASUREMENT.parquet: 6742 bytes", +"MEASUREMENT_BAD.parquet: 3982 bytes", +"MEASUREMENT_LINKS.parquet: 2309 bytes", +"OBSERVATION.parquet: 5614 bytes", +"OBSERVATION_BAD.parquet: 3618 bytes", +"OBSERVATION_LINKS.parquet: 1263 bytes", +"OBSERVATION_PERIOD.parquet: 2183 bytes", +"OBSERVATION_PERIOD_BAD.parquet: 1488 bytes", +"OBSERVATION_PERIOD_LINKS.parquet: 606 bytes", +"PERSON.parquet: 5420 bytes", +"PERSON_BAD.parquet: 3614 bytes", +"PERSON_LINKS.parquet: 1953 bytes", +"PROCEDURE_OCCURRENCE.parquet: 5230 bytes", +"PROCEDURE_OCCURRENCE_BAD.parquet: 3665 bytes", +"PROCEDURE_OCCURRENCE_LINKS.parquet: 1311 bytes", +"SPECIMEN.parquet: 4873 bytes", +"SPECIMEN_BAD.parquet: 3326 bytes", +"SPECIMEN_LINKS.parquet: 928 bytes", +"VISIT_DETAIL.parquet: 3228 bytes", +"VISIT_DETAIL_BAD.parquet: 3228 bytes", +"VISIT_DETAIL_LINKS.parquet: 435 bytes", +"VISIT_OCCURRENCE.parquet: 5259 bytes", +"VISIT_OCCURRENCE_BAD.parquet: 3429 bytes", +"VISIT_OCCURRENCE_LINKS.parquet: 1349 bytes" + ], + "datetime":"2023-12-07 14:08:58", + "user":"John Watts", + "settings":{ + "site":"UCLH", + "cdm_source_name":"Test Extract - UCLH OMOP CDM", + "cdm_source_abbreviation":"Test UCLH OMOP", + "project_logic":"mock_project_settings/project_logic.R", + "min_date": 20100101, + "max_date": 20241231, + "enabled_sources":"epic", + "output_format":"parquet", + "OMOP_version": 60, + "cohort":{ + "file":"settings/mock_project_settings/mock_cohort.csv", + "exclude_NDOO": true, + "exclude_confidential": true, + "min_age_at_encounter_start": 16, + "max_age_at_encounter_start": 80 + }, + "keep_source_vals": false, + "person":{ + "include_nhs_number": false, + "include_mrn": false, + "keep_day_of_birth": false, + "keep_month_of_birth": true, + "include_gp_as_primary_care_site": false + }, + "observation_period_strategy":"visit_span", + "local_timezone":"Europe/London", + "output_timezone":"GMT", + "condition_occurrence":{ + "include_sexual_health": false, + "allow_icd_as_std": true + }, + "measurements":{ + "include_file":null, + "include_measurement_concept_ids":null, + "non_generic_numeric_labs": 3040104 + }, + "location":{ + "keep_only_zip": true, + "replace_postcode_with_LSOA": true + }, + "mapping_effective_date": 19698, + "name":"mock_project_settings" + } +} \ No newline at end of file diff --git a/test/data/resources/omop/private/PERSON_LINKS.parquet b/test/data/resources/omop/private/PERSON_LINKS.parquet new file mode 100644 index 000000000..bef07a1a2 Binary files /dev/null and b/test/data/resources/omop/private/PERSON_LINKS.parquet differ diff --git a/test/data/resources/omop/private/PROCEDURE_OCCURRENCE_LINKS.parquet b/test/data/resources/omop/private/PROCEDURE_OCCURRENCE_LINKS.parquet new file mode 100644 index 000000000..97a953bd4 Binary files /dev/null and b/test/data/resources/omop/private/PROCEDURE_OCCURRENCE_LINKS.parquet differ diff --git a/test/data/resources/omop/public/PROCEDURE_OCCURRENCE.parquet b/test/data/resources/omop/public/PROCEDURE_OCCURRENCE.parquet new file mode 100644 index 000000000..01451206e Binary files /dev/null and b/test/data/resources/omop/public/PROCEDURE_OCCURRENCE.parquet differ diff --git a/test/data/test.csv b/test/data/test.csv deleted file mode 100644 index f6f9fbd2d..000000000 --- a/test/data/test.csv +++ /dev/null @@ -1,2 +0,0 @@ -VAL_ID,ACCESSION_NUMBER,STUDY_INSTANCE_UID,STUDY_DATE -patient_identifier,123456789,c,01/01/2022 00:01 diff --git a/test/run-system-test.sh b/test/run-system-test.sh index dfa8b2a7d..dc70cd51d 100755 --- a/test/run-system-test.sh +++ b/test/run-system-test.sh @@ -26,7 +26,7 @@ cd .. && \ ./scripts/insert_test_data.sh pip install "${PACKAGE_DIR}/pixl_core" "${PACKAGE_DIR}/cli" -pixl populate data/test.csv +pixl populate "${PACKAGE_DIR}/test/data/resources/omop" pixl start sleep 65 # need to wait until the DICOM image is "stable" = 60s ./scripts/check_entry_in_pixl_anon.sh