Skip to content

Commit

Permalink
Parquet input to OS (#186)
Browse files Browse the repository at this point in the history
* Add -e to pip install command in cli test

* Added flag for csv=True and ran tests for cli

* Added test for parquet directory and messages_from_parquet func

* Use resources fixture to find test parquest files. Make input options
clearer. Tests will still fail due to missing private parquet files.

* Add synthetic "private" OMOP data taken from the issue

* Add dependency for reading parquet files

* Add procedure occurrence id column to message. Use pandas merge instead
of join.

* Remove option for CSV cohort file

* formatting changes

* Linting fixes

* Fix other uses of modified method

* ruff fix

* Downstream class needs new attribute as well

* Add type annotations to `test_messages_from_parquet()`

* Add project name and OMOP ES timestamp to rabbitmq messages

* Update `serialise()` callers with new arguments

* Add `project_name` and `omop_es_timestamp` fields to `ImagingStudy` class

* Refactor message serialisation and deserialisation (#197)

* Refactor message serialization and deserialization

Addiing `Message` and `SerialisedMessage` classes in attempt to improve information hiding and decoupling.

* Rename `utils.py` -> `message.py`

* Add `decode()` method for `SerialisedMessage`

* Update docstring

* Use new classes in message testing

* Refactor message processing in the CLI

* Refactor `process_message` to use `SerialisedMessage` class in EHR API

* Refactor `process_message` to use `SerialisedMessage` class in imaging API

* Fix `ImagingStudy` initalisation in  `ImagingStudy.from_message()`

* Fix imports

* Fix test: access serialised message bodies

* Turn `Message` into a `dataclass`

* Fix failing tests

* Use `jsonpickle` for (de)serializing messages

This also removes the need for the `SerialisedMessage` class

* Fix `test_deserialise_datetime()` so it uses the `Message` class to assert the `study_datetime`

* Add `study_datetime` property for `Message`

* No need to test deserialising individual fields, already covered by `test_deserialise()` which deserialises the entire object

* Remove `study_date_from_serialised()`, use the class attribute `study_datetime` instead

* Revert "Add `study_datetime` property for `Message`"

This reverts commit 8719153.

* Remove `Messages` class, use `list[Message]` instead

* Add type checking for messages parsed from parquet input

* Update `test_messages_from_parquet()` to use JSON strings instead of bytes

* Update `PixlProducer.publish()` to use a list of Message objects and handle serialisation

* Convert JSON string to bytes when serialising

* Revert "Update `test_messages_from_parquet()` to use JSON strings instead of bytes"

This reverts commit 0e4fce4.

* `PixlProducer.publish()` should take a `list[Message]` as input in tests

* Update EHR API to use new `Message` design

* Update imaging API to use new `Message` design

* Update deserialise function to accept bytes-encoded JSON string

* Assert messages against list of `Message`s

* Print dataclass in logs

* `jsonpickle.decode()` can handle bytes so no need to decode first

Also add a note about why we ignore ruff rule S301

* Make `deserialisable` a keyword only argument

* Copilot forgot to convert dates to datetimes 🥲

* Refactor PixlConsumer run method to accept Message object as callback parameter and deserialise

* Update consumer in `test_subscriber` to accept Message object instead of bytes

* Format README

* Update documentation with new parquet inputs

* Update system test to use parquet files

* Move `extract_summary.json` one level up and update docs

* Change `parquet-dir` to a click argument instead of option

* `click.argument` doesn't have a `help` parameter

Document `parquet-dir` in the docstrings instead

* Slim down parquet test files to only those needed

* Raise more appropriate exceptions for missing file or directory

* Print log message about messages created during regular runs

* Update error message for parquet files

---------

Co-authored-by: ruaridhg <ruaridhg@users.noreply.github.com>
Co-authored-by: Jeremy Stein <j.stein@ucl.ac.uk>
Co-authored-by: Milan Malfait <m.malfait@ucl.ac.uk>
  • Loading branch information
4 people authored Dec 21, 2023
1 parent 592a893 commit 573dea7
Show file tree
Hide file tree
Showing 42 changed files with 595 additions and 235 deletions.
36 changes: 24 additions & 12 deletions cli/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# PIXL Driver + Command line interface

The PIXL CLI driver provides functionality to populate a queue with messages
containing information required to run electronic health queries against the
EMAP star database and the PACS image system. Once a set of queues are
populated the consumers can be started, updated and the system extractions
stopped cleanly.

The PIXL CLI driver provides functionality to populate a queue with messages
containing information required to run electronic health queries against the
EMAP star database and the PACS image system. Once a set of queues are
populated the consumers can be started, updated and the system extractions
stopped cleanly.

## Installation

Expand All @@ -14,43 +13,56 @@ pip install -e ../pixl_core/ .
```

## Test

```bash
./tests/run-tests.sh
```


## Usage

> **Note**
> Services must be started prior to using the CLI
See the commands and subcommands with

```bash
pixl --help
```

Populate queue for PACS and EHR extraction

```bash
pixl populate <filename>.csv
pixl populate </path/to/parquet_dir>
```
where the csv file contains MRN, accession numbers and timestamps in the format:

| VAL_ID | ACCESSION_NUMBER | STUDY_INSTANCE_UID | STUDY_DATE | ... |
|--------|------------------|--------------------|------------------|-----|
| X | Y | Z | 29/02/2010 05:12 | |
where `parquet_dir` contains at least the following files:

```sh
parquet_dir
├── extract_summary.json
├── private
│ ├── PERSON_LINKS.parquet
│ └── PROCEDURE_OCCURRENCE_LINKS.parquet
└── public
└── PROCEDURE_OCCURRENCE.parquet
```

Start the PACS extraction

```bash
pixl start --queues pacs
```

and equivalently the EHR extraction

```bash
pixl start --queues ehr
```

Use `pixl start --help` for information.

Stop PACS and EHR database extraction

```bash
pixl stop
```
1 change: 1 addition & 0 deletions cli/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies = [
"click==8.1.3",
"coloredlogs==15.0.1",
"pandas==1.5.1",
"pyarrow==14.0.1",
"PyYAML==6.0"
]

Expand Down
186 changes: 110 additions & 76 deletions cli/src/pixl_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
import datetime
import json
import os
from operator import attrgetter
from pathlib import Path
from typing import Any, Optional

import click
import pandas as pd
import requests
import yaml
from core.patient_queue.message import Message, deserialise
from core.patient_queue.producer import PixlProducer
from core.patient_queue.subscriber import PixlBlockingConsumer
from core.patient_queue.utils import deserialise, serialise

from ._logging import logger, set_log_level
from ._utils import clear_file, remove_file_if_it_exists, string_is_non_empty
Expand Down Expand Up @@ -56,35 +57,51 @@ def cli(*, debug: bool) -> None:


@cli.command()
@click.argument("csv_filename", type=click.Path(exists=True))
@click.option(
"--queues",
default="ehr,pacs",
show_default=True,
help="Comma seperated list of queues to populate with messages generated from the " ".csv file",
help="Comma seperated list of queues to populate with messages generated from the "
"input file(s)",
)
@click.option(
"--restart/--no-restart",
show_default=True,
default=True,
help="Restart from a saved state. Otherwise will use the given .csv file",
help="Restart from a saved state. Otherwise will use the given input file(s)",
)
def populate(csv_filename: str, queues: str, *, restart: bool) -> None:
"""Populate a (set of) queue(s) from a csv file"""
logger.info(f"Populating queue(s) {queues} from {csv_filename}")

@click.argument(
"parquet-dir", required=True, type=click.Path(path_type=Path, exists=True, file_okay=False)
)
def populate(parquet_dir: Path, *, restart: bool, queues: str) -> None:
"""
Populate a (set of) queue(s) from a parquet file directory
PARQUET-DIR: Directory containing the public and private parquet input files and an
extract_summary.json log file.
It's expected that the directory structure will be:
PARQUET-DIR
├── private
│ ├── PERSON_LINKS.parquet
│ └── PROCEDURE_OCCURRENCE_LINKS.parquet
├── public
│ └── PROCEDURE_OCCURRENCE.parquet
└── extract_summary.json
"""
logger.info(f"Populating queue(s) {queues} from {parquet_dir}")
for queue in queues.split(","):
with PixlProducer(queue_name=queue, **config["rabbitmq"]) as producer:
state_filepath = state_filepath_for_queue(queue)
if state_filepath.exists() and restart:
logger.info(f"Extracting messages from state: {state_filepath}")
inform_user_that_queue_will_be_populated_from(state_filepath)
messages = Messages.from_state_file(state_filepath)
else:
messages = messages_from_csv(Path(csv_filename))
messages = messages_from_state_file(state_filepath)
elif parquet_dir is not None:
messages = messages_from_parquet(parquet_dir)

remove_file_if_it_exists(state_filepath) # will be stale
producer.publish(sorted(messages, key=study_date_from_serialised))
producer.publish(sorted(messages, key=attrgetter("study_datetime")))


@cli.command()
Expand Down Expand Up @@ -268,81 +285,107 @@ def state_filepath_for_queue(queue_name: str) -> Path:
return Path(f"{queue_name.replace('/', '_')}.state")


class Messages(list):
def messages_from_state_file(filepath: Path) -> list[Message]:
"""
Class to represent messages
Return messages from a state file path
Methods
-------
from_state_file(cls, filepath)
Return messages from a state file path
:param filepath: Path for state file to be read
:return: A list of Message objects containing all the messages from the state file
"""
logger.info(f"Creating messages from {filepath}")
if not filepath.exists():
raise FileNotFoundError
if filepath.suffix != ".state":
msg = f"Invalid file suffix for {filepath}. Expected .state"
raise ValueError(msg)

@classmethod
def from_state_file(cls, filepath: Path) -> "Messages":
"""
Return messages from a state file path
:param filepath: Path for state file to be read
:return: A Messages object containing all the messages from the state file
"""
logger.info(f"Creating messages from {filepath}")
if not filepath.exists():
raise FileNotFoundError
if filepath.suffix != ".state":
msg = f"Invalid file suffix for {filepath}. Expected .state"
raise ValueError(msg)

return cls(
[
line.encode("utf-8")
for line in Path.open(filepath).readlines()
if string_is_non_empty(line)
]
)
return [
deserialise(line) for line in Path.open(filepath).readlines() if string_is_non_empty(line)
]


def messages_from_csv(filepath: Path) -> Messages:
def messages_from_parquet(dir_path: Path) -> list[Message]:
"""
Reads patient information from CSV and transforms that into messages.
:param filepath: Path for CSV file to be read
Reads patient information from parquet files within directory structure
and transforms that into messages.
:param dir_path: Path for parquet directory containing private and public
files
"""
public_dir = dir_path / "public"
private_dir = dir_path / "private"
log_file = dir_path / "extract_summary.json"

for d in [public_dir, private_dir]:
if not d.is_dir():
err_str = f"{d} must exist and be a directory"
raise NotADirectoryError(err_str)

if not log_file.is_file():
err_str = f"{log_file} must exist and be a file"
raise FileNotFoundError(err_str)

# MRN in people.PrimaryMrn:
people = pd.read_parquet(private_dir / "PERSON_LINKS.parquet")
# accession number in accessions.AccesionNumber
accessions = pd.read_parquet(private_dir / "PROCEDURE_OCCURRENCE_LINKS.parquet")
# study_date is in procedure.procdure_date
procedure = pd.read_parquet(public_dir / "PROCEDURE_OCCURRENCE.parquet")
# joining data together
people_procedures = people.merge(procedure, on="person_id")
cohort_data = people_procedures.merge(accessions, on="procedure_occurrence_id")

expected_col_names = [
"VAL_ID",
"ACCESSION_NUMBER",
"STUDY_INSTANCE_UID",
"STUDY_DATE",
"PrimaryMrn",
"AccessionNumber",
"person_id",
"procedure_date",
"procedure_occurrence_id",
]
logger.debug(
f"Extracting messages from {filepath}. Expecting columns to include "
f"Extracting messages from {dir_path}. Expecting columns to include "
f"{expected_col_names}"
)

# First line is column names
messages_df = pd.read_csv(filepath, header=0, dtype=str)
messages = Messages()

if list(messages_df.columns)[:4] != expected_col_names:
msg = f"csv file expected to have at least {expected_col_names} as " f"column names"
raise ValueError(msg)

mrn_col_name, acc_num_col_name, _, dt_col_name = expected_col_names
for _, row in messages_df.iterrows():
messages.append(
serialise(
mrn=row[mrn_col_name],
accession_number=row[acc_num_col_name],
study_datetime=datetime.datetime.strptime(
row[dt_col_name], "%d/%m/%Y %H:%M"
).replace(tzinfo=datetime.timezone.utc),
for col in expected_col_names:
if col not in list(cohort_data.columns):
msg = (
f"parquet files are expected to have at least {expected_col_names} as "
f"column names"
)
raise ValueError(msg)

(
mrn_col_name,
acc_num_col_name,
_,
dt_col_name,
procedure_occurrence_id,
) = expected_col_names

# Get project name and OMOP ES timestamp from log file
logs = json.load(log_file.open())
project_name = logs["settings"]["cdm_source_name"]
omop_es_timestamp = datetime.datetime.fromisoformat(logs["datetime"])

messages = []

for _, row in cohort_data.iterrows():
# Create new dict to initialise message
message = Message(
mrn=row[mrn_col_name],
accession_number=row[acc_num_col_name],
study_datetime=row[dt_col_name],
procedure_occurrence_id=row[procedure_occurrence_id],
project_name=project_name,
omop_es_timestamp=omop_es_timestamp,
)
messages.append(message)

if len(messages) == 0:
msg = f"Failed to find any messages in {filepath}"
msg = f"Failed to find any messages in {dir_path}"
raise ValueError(msg)

logger.debug(f"Created {len(messages)} messages from {filepath}")
logger.info(f"Created {len(messages)} messages from {dir_path}")
return messages


Expand Down Expand Up @@ -405,12 +448,3 @@ def api_config_for_queue(queue_name: str) -> APIConfig:
raise ValueError(msg)

return APIConfig(config[config_key])


def study_date_from_serialised(message: bytes) -> datetime.datetime:
"""Get the study date from a serialised message as a datetime"""
result = deserialise(message)["study_datetime"]
if not isinstance(result, datetime.datetime):
msg = "Expected study date to be a datetime. Got %s"
raise TypeError(msg, type(result))
return result
Loading

0 comments on commit 573dea7

Please sign in to comment.