Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet input to OS #186

Merged
merged 29 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5b60fd4
Add -e to pip install command in cli test
ruaridhg Dec 14, 2023
68cc3bb
Added flag for csv=True and ran tests for cli
ruaridhg Dec 14, 2023
91b5dfe
Added test for parquet directory and messages_from_parquet func
ruaridhg Dec 15, 2023
20988f7
Use resources fixture to find test parquest files. Make input options
jeremyestein Dec 15, 2023
e6bfb7d
Add synthetic "private" OMOP data taken from the issue
jeremyestein Dec 18, 2023
faf50dc
Add dependency for reading parquet files
jeremyestein Dec 18, 2023
9129be3
Add procedure occurrence id column to message. Use pandas merge instead
jeremyestein Dec 18, 2023
273e38e
Remove option for CSV cohort file
jeremyestein Dec 18, 2023
bbc101b
Merge branch 'main' into rmg/parquet_inp
jeremyestein Dec 18, 2023
2245c96
formatting changes
jeremyestein Dec 18, 2023
48efeea
Linting fixes
jeremyestein Dec 18, 2023
48db87a
Fix other uses of modified method
jeremyestein Dec 18, 2023
0915cc8
ruff fix
jeremyestein Dec 18, 2023
a9691a1
Downstream class needs new attribute as well
jeremyestein Dec 18, 2023
77da37a
Add type annotations to `test_messages_from_parquet()`
milanmlft Dec 18, 2023
3b37183
Add project name and OMOP ES timestamp to rabbitmq messages
milanmlft Dec 18, 2023
42b1c85
Update `serialise()` callers with new arguments
milanmlft Dec 18, 2023
bf39b46
Add `project_name` and `omop_es_timestamp` fields to `ImagingStudy` c…
milanmlft Dec 19, 2023
90b6672
Refactor message serialisation and deserialisation (#197)
milanmlft Dec 20, 2023
a243f6f
Format README
milanmlft Dec 20, 2023
b4c7a51
Update documentation with new parquet inputs
milanmlft Dec 20, 2023
10dd3e6
Update system test to use parquet files
milanmlft Dec 20, 2023
1192210
Move `extract_summary.json` one level up and update docs
milanmlft Dec 21, 2023
6b4f38c
Change `parquet-dir` to a click argument instead of option
milanmlft Dec 21, 2023
3a04feb
`click.argument` doesn't have a `help` parameter
milanmlft Dec 21, 2023
1a2fab4
Slim down parquet test files to only those needed
milanmlft Dec 21, 2023
c4cecc9
Raise more appropriate exceptions for missing file or directory
milanmlft Dec 21, 2023
76c3897
Print log message about messages created during regular runs
milanmlft Dec 21, 2023
845e826
Update error message for parquet files
milanmlft Dec 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions cli/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
# PIXL Driver + Command line interface

The PIXL CLI driver provides functionality to populate a queue with messages
containing information required to run electronic health queries against the
EMAP star database and the PACS image system. Once a set of queues are
populated the consumers can be started, updated and the system extractions
stopped cleanly.

The PIXL CLI driver provides functionality to populate a queue with messages
containing information required to run electronic health queries against the
EMAP star database and the PACS image system. Once a set of queues are
populated the consumers can be started, updated and the system extractions
stopped cleanly.

## Installation

Expand All @@ -14,43 +13,58 @@ pip install -e ../pixl_core/ .
```

## Test

```bash
./tests/run-tests.sh
```


## Usage

> **Note**
> Services must be started prior to using the CLI

See the commands and subcommands with

```bash
pixl --help
```

Populate queue for PACS and EHR extraction

```bash
pixl populate <filename>.csv
pixl populate --parquet-dir </path/to/parquet_dir>
```
where the csv file contains MRN, accession numbers and timestamps in the format:

| VAL_ID | ACCESSION_NUMBER | STUDY_INSTANCE_UID | STUDY_DATE | ... |
|--------|------------------|--------------------|------------------|-----|
| X | Y | Z | 29/02/2010 05:12 | |
where `parquet_dir` contains at least the following files:

```sh
resources
└── omop
├── log
│ └── extract_summary.json
├── private
│ ├── PERSON_LINKS.parquet
│ └── PROCEDURE_OCCURRENCE_LINKS.parquet
└── public
└── PROCEDURE_OCCURRENCE.parquet
milanmlft marked this conversation as resolved.
Show resolved Hide resolved
```

Start the PACS extraction

```bash
pixl start --queues pacs
```

and equivalently the EHR extraction

```bash
pixl start --queues ehr
```

Use `pixl start --help` for information.

Stop PACS and EHR database extraction

```bash
pixl stop
```
1 change: 1 addition & 0 deletions cli/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies = [
"click==8.1.3",
"coloredlogs==15.0.1",
"pandas==1.5.1",
"pyarrow==14.0.1",
"PyYAML==6.0"
]

Expand Down
169 changes: 93 additions & 76 deletions cli/src/pixl_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
import datetime
import json
import os
from operator import attrgetter
from pathlib import Path
from typing import Any, Optional

import click
import pandas as pd
import requests
import yaml
from core.patient_queue.message import Message, deserialise
from core.patient_queue.producer import PixlProducer
from core.patient_queue.subscriber import PixlBlockingConsumer
from core.patient_queue.utils import deserialise, serialise

from ._logging import logger, set_log_level
from ._utils import clear_file, remove_file_if_it_exists, string_is_non_empty
Expand Down Expand Up @@ -56,35 +57,40 @@ def cli(*, debug: bool) -> None:


@cli.command()
@click.argument("csv_filename", type=click.Path(exists=True))
@click.option(
"--queues",
default="ehr,pacs",
show_default=True,
help="Comma seperated list of queues to populate with messages generated from the " ".csv file",
help="Comma seperated list of queues to populate with messages generated from the "
"input file(s)",
)
@click.option(
"--restart/--no-restart",
show_default=True,
default=True,
help="Restart from a saved state. Otherwise will use the given .csv file",
help="Restart from a saved state. Otherwise will use the given input file(s)",
)
def populate(csv_filename: str, queues: str, *, restart: bool) -> None:
"""Populate a (set of) queue(s) from a csv file"""
logger.info(f"Populating queue(s) {queues} from {csv_filename}")

@click.option(
"--parquet-dir",
required=True,
type=click.Path(path_type=Path, exists=True, file_okay=False),
help="Give a directory containing parquet input files",
)
def populate(queues: str, *, restart: bool, parquet_dir: Path) -> None:
"""Populate a (set of) queue(s) from a parquet file directory"""
milanmlft marked this conversation as resolved.
Show resolved Hide resolved
logger.info(f"Populating queue(s) {queues} from {parquet_dir}")
for queue in queues.split(","):
with PixlProducer(queue_name=queue, **config["rabbitmq"]) as producer:
state_filepath = state_filepath_for_queue(queue)
if state_filepath.exists() and restart:
logger.info(f"Extracting messages from state: {state_filepath}")
inform_user_that_queue_will_be_populated_from(state_filepath)
messages = Messages.from_state_file(state_filepath)
else:
messages = messages_from_csv(Path(csv_filename))
messages = messages_from_state_file(state_filepath)
milanmlft marked this conversation as resolved.
Show resolved Hide resolved
elif parquet_dir is not None:
messages = messages_from_parquet(parquet_dir)

remove_file_if_it_exists(state_filepath) # will be stale
producer.publish(sorted(messages, key=study_date_from_serialised))
producer.publish(sorted(messages, key=attrgetter("study_datetime")))


@cli.command()
Expand Down Expand Up @@ -268,81 +274,101 @@ def state_filepath_for_queue(queue_name: str) -> Path:
return Path(f"{queue_name.replace('/', '_')}.state")


class Messages(list):
def messages_from_state_file(filepath: Path) -> list[Message]:
"""
Class to represent messages
Return messages from a state file path

Methods
-------
from_state_file(cls, filepath)
Return messages from a state file path
:param filepath: Path for state file to be read
:return: A list of Message objects containing all the messages from the state file
"""
logger.info(f"Creating messages from {filepath}")
if not filepath.exists():
raise FileNotFoundError
if filepath.suffix != ".state":
msg = f"Invalid file suffix for {filepath}. Expected .state"
raise ValueError(msg)

@classmethod
def from_state_file(cls, filepath: Path) -> "Messages":
"""
Return messages from a state file path

:param filepath: Path for state file to be read
:return: A Messages object containing all the messages from the state file
"""
logger.info(f"Creating messages from {filepath}")
if not filepath.exists():
raise FileNotFoundError
if filepath.suffix != ".state":
msg = f"Invalid file suffix for {filepath}. Expected .state"
raise ValueError(msg)

return cls(
[
line.encode("utf-8")
for line in Path.open(filepath).readlines()
if string_is_non_empty(line)
]
)
return [
deserialise(line) for line in Path.open(filepath).readlines() if string_is_non_empty(line)
]


def messages_from_csv(filepath: Path) -> Messages:
def messages_from_parquet(dir_path: Path) -> list[Message]:
"""
Reads patient information from CSV and transforms that into messages.
:param filepath: Path for CSV file to be read
Reads patient information from parquet files within directory structure
and transforms that into messages.
:param dir_path: Path for parquet directory containing private and public
files
"""
public_dir = dir_path / "public"
private_dir = dir_path / "private"
log_dir = dir_path / "log"
milanmlft marked this conversation as resolved.
Show resolved Hide resolved

for d in [public_dir, private_dir, log_dir]:
if not d.is_dir():
err_str = f"{d} must exist and be a directory"
raise ValueError(err_str)
milanmlft marked this conversation as resolved.
Show resolved Hide resolved

# MRN in people.PrimaryMrn:
people = pd.read_parquet(private_dir / "PERSON_LINKS.parquet")
# accession number in accessions.AccesionNumber
accessions = pd.read_parquet(private_dir / "PROCEDURE_OCCURRENCE_LINKS.parquet")
# study_date is in procedure.procdure_date
procedure = pd.read_parquet(public_dir / "PROCEDURE_OCCURRENCE.parquet")
# joining data together
people_procedures = people.merge(procedure, on="person_id")
cohort_data = people_procedures.merge(accessions, on="procedure_occurrence_id")

expected_col_names = [
"VAL_ID",
"ACCESSION_NUMBER",
"STUDY_INSTANCE_UID",
"STUDY_DATE",
"PrimaryMrn",
"AccessionNumber",
"person_id",
"procedure_date",
"procedure_occurrence_id",
]
logger.debug(
f"Extracting messages from {filepath}. Expecting columns to include "
f"Extracting messages from {dir_path}. Expecting columns to include "
f"{expected_col_names}"
)

# First line is column names
messages_df = pd.read_csv(filepath, header=0, dtype=str)
messages = Messages()

if list(messages_df.columns)[:4] != expected_col_names:
msg = f"csv file expected to have at least {expected_col_names} as " f"column names"
raise ValueError(msg)
for col in expected_col_names:
if col not in list(cohort_data.columns):
msg = f"csv file expected to have at least {expected_col_names} as " f"column names"
milanmlft marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(msg)

mrn_col_name, acc_num_col_name, _, dt_col_name = expected_col_names
for _, row in messages_df.iterrows():
messages.append(
serialise(
mrn=row[mrn_col_name],
accession_number=row[acc_num_col_name],
study_datetime=datetime.datetime.strptime(
row[dt_col_name], "%d/%m/%Y %H:%M"
).replace(tzinfo=datetime.timezone.utc),
)
(
mrn_col_name,
acc_num_col_name,
_,
dt_col_name,
procedure_occurrence_id,
) = expected_col_names

# Get project name and OMOP ES timestamp from log file
log_file = log_dir / "extract_summary.json"
logs = json.load(log_file.open())
project_name = logs["settings"]["cdm_source_name"]
omop_es_timestamp = datetime.datetime.fromisoformat(logs["datetime"])

messages = []

for _, row in cohort_data.iterrows():
# Create new dict to initialise message
message = Message(
mrn=row[mrn_col_name],
accession_number=row[acc_num_col_name],
study_datetime=row[dt_col_name],
procedure_occurrence_id=row[procedure_occurrence_id],
project_name=project_name,
omop_es_timestamp=omop_es_timestamp,
)
messages.append(message)

if len(messages) == 0:
msg = f"Failed to find any messages in {filepath}"
msg = f"Failed to find any messages in {dir_path}"
raise ValueError(msg)

logger.debug(f"Created {len(messages)} messages from {filepath}")
logger.debug(f"Created {len(messages)} messages from {dir_path}")
milanmlft marked this conversation as resolved.
Show resolved Hide resolved
return messages


Expand Down Expand Up @@ -405,12 +431,3 @@ def api_config_for_queue(queue_name: str) -> APIConfig:
raise ValueError(msg)

return APIConfig(config[config_key])


def study_date_from_serialised(message: bytes) -> datetime.datetime:
"""Get the study date from a serialised message as a datetime"""
result = deserialise(message)["study_datetime"]
if not isinstance(result, datetime.datetime):
msg = "Expected study date to be a datetime. Got %s"
raise TypeError(msg, type(result))
return result
Loading