Skip to content

Commit

Permalink
Don't send images to imaging queue if they have been exported (#202)
Browse files Browse the repository at this point in the history
* Create tables for image tracking

* Create dummy testing for CLI-database interaction

* Create dummy testing for CLI-database interaction

* Filter message if image has been exported before

* Filter pacs images that have been exported

* Use context manager in test session

Its a bit nicer

* Add postgres config to end to end test

* Finish comment 🤦

* Use url creator

* Add headers

* Add headers

* Add headers

last one?

* Pass in project slug to filtering

* Apply suggestions from code review

Co-authored-by: Milan Malfait <m.malfait@ucl.ac.uk>

* Use pixl schemata for tables

* Ensure images rows are added to the database

* Add nice representation for database entities

* Ensure different extracts can have the same images

* Remove old comment

* Update documentation of database fixtures

---------

Co-authored-by: ruaridhg <ruaridhg@users.noreply.github.com>
Co-authored-by: Milan Malfait <m.malfait@ucl.ac.uk>
  • Loading branch information
3 people authored Jan 4, 2024
1 parent 38c5040 commit 4cb94ac
Show file tree
Hide file tree
Showing 14 changed files with 476 additions and 27 deletions.
32 changes: 32 additions & 0 deletions cli/src/pixl_cli/_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright (c) University College London Hospitals NHS Foundation Trust
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Configuration of CLI from config file."""
from pathlib import Path

import yaml


def _load_config(filename: str = "pixl_config.yml") -> dict:
"""CLI configuration generated from a .yaml file"""
if not Path(filename).exists():
msg = f"Failed to find {filename}. It must be present in the current working directory"
raise FileNotFoundError(msg)

with Path(filename).open() as config_file:
config_dict = yaml.safe_load(config_file)
return dict(config_dict)


cli_config = _load_config()
112 changes: 112 additions & 0 deletions cli/src/pixl_cli/_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright (c) University College London Hospitals NHS Foundation Trust
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Interaction with the PIXL database."""

from core.database import Extract, Image
from core.patient_queue.message import Message
from sqlalchemy import URL, create_engine
from sqlalchemy.orm import Session, sessionmaker

from pixl_cli._config import cli_config

connection_config = cli_config["postgres"]

url = URL.create(
drivername="postgresql+psycopg2",
username=connection_config["username"],
password=connection_config["password"],
host=connection_config["host"],
port=connection_config["port"],
database=connection_config["database"],
)

engine = create_engine(url)


def filter_exported_or_add_to_db(messages: list[Message], project_slug: str) -> list[Message]:
"""
Filter exported images for this project, and adds missing extract or images to database.
:param messages: Initial messages to filter if they already exist
:param project_slug: project slug to query on
:return messages that have not been exported
"""
PixlSession = sessionmaker(engine)
with PixlSession() as pixl_session, pixl_session.begin():
extract, extract_created = _get_or_create_project(project_slug, pixl_session)

return _filter_exported_messages(
extract, messages, pixl_session, extract_created=extract_created
)


def _get_or_create_project(project_slug: str, session: Session) -> tuple[Extract, bool]:
existing_extract = session.query(Extract).filter(Extract.slug == project_slug).one_or_none()
if existing_extract:
return existing_extract, False
new_extract = Extract(slug=project_slug)
session.add(new_extract)
return new_extract, True


def _filter_exported_messages(
extract: Extract, messages: list[Message], session: Session, *, extract_created: bool
) -> list[Message]:
output_messages = []
for message in messages:
_, image_exported = _get_image_and_check_exported(
extract, message, session, extract_created=extract_created
)
if not image_exported:
output_messages.append(message)
return output_messages


def _get_image_and_check_exported(
extract: Extract, message: Message, session: Session, *, extract_created: bool
) -> tuple[Image, bool]:
if extract_created:
new_image = _add_new_image_to_session(extract, message, session)
return new_image, False

existing_image = (
session.query(Image)
.filter(
Image.extract == extract,
Image.accession_number == message.accession_number,
Image.mrn == message.mrn,
Image.study_date == message.study_date,
)
.one_or_none()
)

if existing_image:
if existing_image.exported_at is not None:
return existing_image, True
return existing_image, False

new_image = _add_new_image_to_session(extract, message, session)
return new_image, False


def _add_new_image_to_session(extract: Extract, message: Message, session: Session) -> Image:
new_image = Image(
accession_number=message.accession_number,
study_date=message.study_date,
mrn=message.mrn,
extract=extract,
)
session.add(new_image)
return new_image
45 changes: 21 additions & 24 deletions cli/src/pixl_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,18 @@

import click
import requests
import yaml
from core.patient_queue.producer import PixlProducer
from core.patient_queue.subscriber import PixlBlockingConsumer

from ._io import copy_parquet_return_logfile_fields, messages_from_parquet, messages_from_state_file
from ._logging import logger, set_log_level
from ._utils import clear_file, remove_file_if_it_exists


def _load_config(filename: str = "pixl_config.yml") -> dict:
"""CLI configuration generated from a .yaml file"""
if not Path(filename).exists():
msg = f"Failed to find {filename}. It must be present " f"in the current working directory"
raise OSError(msg)

with Path(filename).open() as config_file:
config_dict = yaml.safe_load(config_file)
return dict(config_dict)


config = _load_config()
from pixl_cli._config import cli_config
from pixl_cli._database import filter_exported_or_add_to_db
from pixl_cli._io import (
copy_parquet_return_logfile_fields,
messages_from_parquet,
messages_from_state_file,
)
from pixl_cli._logging import logger, set_log_level
from pixl_cli._utils import clear_file, remove_file_if_it_exists

# localhost needs to be added to the NO_PROXY environment variables on GAEs
os.environ["NO_PROXY"] = os.environ["no_proxy"] = "localhost"
Expand All @@ -58,7 +49,7 @@ def cli(*, debug: bool) -> None:
@cli.command()
@click.option(
"--queues",
default="ehr,pacs",
default="pacs,ehr",
show_default=True,
help="Comma seperated list of queues to populate with messages generated from the "
"input file(s)",
Expand Down Expand Up @@ -101,8 +92,14 @@ def populate(parquet_dir: Path, *, restart: bool, queues: str) -> None:

remove_file_if_it_exists(state_filepath) # will be stale

with PixlProducer(queue_name=queue, **config["rabbitmq"]) as producer:
producer.publish(sorted(messages, key=attrgetter("study_date")))
sorted_messages = sorted(messages, key=attrgetter("study_date"))
# For imaging, we don't want to query again for images that have already been exported
if queue == "pacs" and messages:
sorted_messages = filter_exported_or_add_to_db(
sorted_messages, messages[0].project_name
)
with PixlProducer(queue_name=queue, **cli_config["rabbitmq"]) as producer:
producer.publish(sorted_messages)


@cli.command()
Expand Down Expand Up @@ -272,7 +269,7 @@ def consume_all_messages_and_save_csv_file(queue_name: str, timeout_in_seconds:
f"{timeout_in_seconds} seconds"
)

with PixlBlockingConsumer(queue_name=queue_name, **config["rabbitmq"]) as consumer:
with PixlBlockingConsumer(queue_name=queue_name, **cli_config["rabbitmq"]) as consumer:
state_filepath = state_filepath_for_queue(queue_name)
if consumer.message_count > 0:
logger.info("Found messages in the queue. Clearing the state file")
Expand Down Expand Up @@ -337,11 +334,11 @@ def api_config_for_queue(queue_name: str) -> APIConfig:
"""Configuration for an API associated with a queue"""
config_key = f"{queue_name}_api"

if config_key not in config:
if config_key not in cli_config:
msg = (
f"Cannot update the rate for {queue_name}. {config_key} was"
f" not specified in the configuration"
)
raise ValueError(msg)

return APIConfig(config[config_key])
return APIConfig(cli_config[config_key])
55 changes: 55 additions & 0 deletions cli/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
import pathlib

import pytest
from core.database import Base, Extract, Image
from core.omop import OmopExtract
from sqlalchemy import Engine, create_engine
from sqlalchemy.orm import Session, sessionmaker


@pytest.fixture(autouse=True)
Expand All @@ -37,3 +40,55 @@ def omop_files(tmp_path_factory: pytest.TempPathFactory, monkeypatch) -> OmopExt
def resources() -> pathlib.Path:
"""Test resources directory path."""
return pathlib.Path(__file__).parent / "resources"


@pytest.fixture(scope="module")
def monkeymodule():
"""Module level monkey patch."""
from _pytest.monkeypatch import MonkeyPatch

monkeypatch = MonkeyPatch()
yield monkeypatch
monkeypatch.undo()


@pytest.fixture(autouse=True, scope="module")
def db_engine(monkeymodule) -> Engine:
"""
Patches the database engine with an in memory database
:returns Engine: Engine for use in other setup fixtures
"""
# SQLite doesnt support schemas, so remove pixl schema from engine options
execution_options = {"schema_translate_map": {"pixl": None}}
engine = create_engine(
"sqlite:///:memory:",
execution_options=execution_options,
echo=True,
echo_pool="debug",
future=True,
)
monkeymodule.setattr("pixl_cli._database.engine", engine)

Base.metadata.create_all(engine)
yield engine
Base.metadata.drop_all(engine)


@pytest.fixture()
def db_session(db_engine) -> Session:
"""
Creates a session for interacting with an in memory database.
Will remove any data from database in setup
:returns Session: Session for use in other setup fixtures.
"""
InMemorySession = sessionmaker(db_engine)
with InMemorySession() as session:
# sqlite with sqlalchemy doesn't rollback, so manually deleting all database entities
session.query(Image).delete()
session.query(Extract).delete()
yield session
session.close()
6 changes: 6 additions & 0 deletions cli/tests/pixl_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,9 @@ ehr_api:
default_rate: 5 # ~queries per second
pacs_api:
default_rate: 5 # queries per second
postgres:
host: host
port: 5432
username: username
password: password
database: database
Loading

0 comments on commit 4cb94ac

Please sign in to comment.