Skip to content

Commit

Permalink
New Link API Endpoint (#79)
Browse files Browse the repository at this point in the history
## Description
Create a new API endpoint, /link, that behaves similar to the existing
/link-record endpoint. The difference is that it will accept a raw PII
payload rather than a FHIR bundle.

## Related Issues
closes #5 

## Additional Notes
Overview of changes
- Adds a new endpoint `/link`
- Adds tests for new endpoint
- updates `src.recordlinker.linking.link.link_record_against_mpi`
function to accept a `schemas.PIIRecord` instead of a FHIR bundle.
- `src.recordlinker.linking.link.link_record_against_mpi` now returns a
`patient_reference_id` in addition to the other values in the returned
tuple
- Adds `patient_reference_id` column to `patient` mpi table
- Renames `internal_id` column to `person_reference_id` in the `person`
mpi table
  • Loading branch information
cbrinson-rise8 authored Oct 17, 2024
1 parent cbe8c78 commit 327d2f8
Show file tree
Hide file tree
Showing 8 changed files with 382 additions and 188 deletions.
4 changes: 4 additions & 0 deletions .github/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@ coverage:
default:
target: 90%
threshold: 10%
patch:
default:
target: 90%
threshold: 10%

Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""add reference_id to person and patient
Revision ID: fb1b1ba0ad1f
Revises: a30b8bbccfdf
Create Date: 2024-10-15 14:57:00.688268
"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = 'fb1b1ba0ad1f'
down_revision: Union[str, None] = 'a30b8bbccfdf'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('mpi_patient', sa.Column('reference_id', sa.Uuid(), nullable=False))
op.add_column('mpi_person', sa.Column('reference_id', sa.Uuid(), nullable=False))
op.drop_column('mpi_person', 'internal_id')
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('mpi_person', sa.Column('internal_id', sa.CHAR(length=32), nullable=False))
op.drop_column('mpi_person', 'reference_id')
op.drop_column('mpi_patient', 'reference_id')
# ### end Alembic commands ###
24 changes: 11 additions & 13 deletions src/recordlinker/linking/link.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import collections
import typing
import uuid

import pydantic
from opentelemetry import trace
Expand Down Expand Up @@ -53,7 +54,7 @@ def fhir_record_to_pii_record(fhir_record: dict) -> schemas.PIIRecord:
# TODO: This is a FHIR specific function, should be moved to a FHIR module
def add_person_resource(
person_id: str,
patient_id: str,
patient_id: typing.Optional[str] = "",
bundle: dict = pydantic.Field(description="A FHIR bundle"),
) -> dict:
"""
Expand Down Expand Up @@ -105,13 +106,12 @@ def compare(
results.append(result)
return matching_rule(results, **kwargs) # type: ignore


def link_record_against_mpi(
record: dict,
record: schemas.PIIRecord,
session: orm.Session,
algorithm: models.Algorithm,
external_person_id: typing.Optional[str] = None,
) -> tuple[bool, str]:
) -> tuple[bool, uuid.UUID, uuid.UUID]:
"""
Runs record linkage on a single incoming record (extracted from a FHIR
bundle) using an existing database as an MPI. Uses a flexible algorithm
Expand All @@ -122,7 +122,7 @@ def link_record_against_mpi(
the new record is linked to the person with the strongest membership
percentage.
:param record: The FHIR-formatted patient resource to try to match to
:param record: The PIIRecord to try to match to
other records in the MPI.
:param session: The SQLAlchemy session to use for database operations.
:param algorithm: An algorithm configuration object
Expand All @@ -131,9 +131,6 @@ def link_record_against_mpi(
Person entity now associated with the incoming patient (either a
new Person ID or the ID of an existing matched Person).
"""
# Extract the PII values from the incoming record
pii_record: schemas.PIIRecord = fhir_record_to_pii_record(record)

# Membership scores need to persist across linkage passes so that we can
# find the highest scoring match across all passes
scores: dict[models.Person, float] = collections.defaultdict(float)
Expand All @@ -146,7 +143,7 @@ def link_record_against_mpi(
# block on the pii_record and the algorithm's blocking criteria, then
# iterate over the patients, grouping them by person
with TRACER.start_as_current_span("link.block"):
patients = mpi_service.get_block_data(session, pii_record, algorithm_pass)
patients = mpi_service.get_block_data(session, record, algorithm_pass)
for patient in patients:
clusters[patient.person].append(patient)

Expand All @@ -158,7 +155,7 @@ def link_record_against_mpi(
for patient in patients:
# increment our match count if the pii_record matches the patient
with TRACER.start_as_current_span("link.compare"):
if compare(pii_record, patient, algorithm_pass):
if compare(record, patient, algorithm_pass):
matched_count += 1
# calculate the match ratio for this person cluster
match_ratio = matched_count / len(patients)
Expand All @@ -175,10 +172,11 @@ def link_record_against_mpi(
with TRACER.start_as_current_span("insert"):
patient = mpi_service.insert_patient(
session,
pii_record,
record,
matched_person,
pii_record.external_id,
record.external_id,
external_person_id,
)

# return a tuple indicating whether a match was found and the person ID
return (bool(matched_person), str(patient.person.internal_id))
return (bool(matched_person), patient.person.reference_id, patient.reference_id)
85 changes: 80 additions & 5 deletions src/recordlinker/main.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
import typing
import uuid
from pathlib import Path
from typing import Annotated
from typing import Optional
Expand All @@ -15,6 +16,7 @@
from sqlalchemy import orm
from sqlalchemy.sql import expression

from recordlinker import schemas
from recordlinker import utils
from recordlinker.base_service import BaseService
from recordlinker.database import get_session
Expand Down Expand Up @@ -74,6 +76,41 @@ class LinkRecordResponse(BaseModel):
default="",
)

class LinkInput(BaseModel):
"""
Schema for requests to the /linking endpoint.
"""

record: schemas.PIIRecord = Field(
description="A PIIRecord to be checked"
)
algorithm: Optional[str] = Field(
description="Optionally, a string that maps to an algorithm label stored in "
"algorithm table",
default=None,
)
external_person_id: Optional[str] = Field(
description="The External Identifier, provided by the client,"
" for a unique patient/person that is linked to patient(s)",
default=None,
)

class LinkResponse(BaseModel):
"""
Schema for requests to the /link endpoint.
"""

is_match: bool = Field(
description="A true value indicates that one or more existing records "
"matched with the provided record, and these results have been linked."
)
patient_reference_id: uuid.UUID = Field(
description="The unique identifier for the patient that has been linked"
)
person_reference_id: uuid.UUID = Field(
description="The identifier for the person that the patient record has "
"been linked to.",
)

class HealthCheckResponse(BaseModel):
"""
Expand Down Expand Up @@ -156,19 +193,20 @@ async def link_record(
updated_bundle=input_bundle,
message="Supplied bundle contains no Patient resource to link on."
)

#convert record to PII
pii_record: schemas.PIIRecord = link.fhir_record_to_pii_record(record_to_link)

# Now link the record
try:
# Make a copy of record_to_link so we don't modify the original
record = copy.deepcopy(record_to_link)
(found_match, new_person_id) = link.link_record_against_mpi(
record=record,
(found_match, new_person_id, _) = link.link_record_against_mpi(
record=pii_record,
session=db_session,
algorithm=algorithm,
external_person_id=external_id,
)
updated_bundle = link.add_person_resource(
new_person_id, record_to_link.get("id", ""), input_bundle
str(new_person_id), pii_record.external_id, input_bundle
)
return LinkRecordResponse(found_match=found_match, updated_bundle=updated_bundle)

Expand All @@ -180,6 +218,43 @@ async def link_record(
message=f"Could not connect to database: {err}"
)

@app.post("/link")
async def link_piirecord(
request: Request,
input: Annotated[LinkInput, Body()],
response: Response,
db_session: orm.Session = Depends(get_session),
) -> LinkResponse:
"""
Compare a PII Reocrd with records in the Master Patient Index (MPI) to
check for matches with existing patient records If matches are found,
returns the patient and person reference id's
"""
pii_record = input.record

external_id = input.external_person_id
algorithm = algorithm_service.get_algorithm_by_label(db_session, input.algorithm)

if not algorithm:
response.status_code = status.HTTP_422_UNPROCESSABLE_ENTITY
raise HTTPException(status_code=422, detail="Error: Invalid algorithm specified")

#link the record
try:
# Make a copy of record_to_link so we don't modify the original
record = copy.deepcopy(pii_record)
(found_match, new_person_id, patient_reference_id) = link.link_record_against_mpi(
record=record,
session=db_session,
algorithm=algorithm,
external_person_id=external_id,
)
return LinkResponse(is_match=found_match, patient_reference_id=patient_reference_id, person_reference_id=new_person_id)

except ValueError:
response.status_code = status.HTTP_400_BAD_REQUEST
raise HTTPException(status_code=400, detail="Error: Bad request")


@app.get("/algorithms")
async def get_algorithm_labels(
Expand Down
3 changes: 2 additions & 1 deletion src/recordlinker/models/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Person(Base):
__tablename__ = "mpi_person"

id: orm.Mapped[int] = orm.mapped_column(get_bigint_pk(), autoincrement=True, primary_key=True)
internal_id: orm.Mapped[uuid.UUID] = orm.mapped_column(default=uuid.uuid4)
reference_id: orm.Mapped[uuid.UUID] = orm.mapped_column(default=uuid.uuid4)
patients: orm.Mapped[list["Patient"]] = orm.relationship(back_populates="person")

def __hash__(self):
Expand Down Expand Up @@ -50,6 +50,7 @@ class Patient(Base):
external_person_id: orm.Mapped[str] = orm.mapped_column(sqltypes.String(255), nullable=True)
external_person_source: orm.Mapped[str] = orm.mapped_column(sqltypes.String(100), nullable=True)
blocking_values: orm.Mapped[list["BlockingValue"]] = orm.relationship(back_populates="patient")
reference_id: orm.Mapped[uuid.UUID] = orm.mapped_column(default=uuid.uuid4)

@classmethod
def _scrub_empty(cls, data: dict) -> dict:
Expand Down
18 changes: 9 additions & 9 deletions tests/unit/linking/test_link.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,18 @@ class TestLinkRecordAgainstMpi:
@pytest.fixture
def patients(self):
bundle = utils.read_json_from_assets("linking", "patient_bundle_to_link_with_mpi.json")
patients = []
patients: list[schemas.PIIRecord] = []
for entry in bundle["entry"]:
if entry.get("resource", {}).get("resourceType", {}) == "Patient":
patients.append(entry["resource"])
patients.append(link.fhir_record_to_pii_record(entry["resource"]))
return patients

def test_basic_match_one(self, session, basic_algorithm, patients):
# Test various null data values in incoming record
matches: list[bool] = []
mapped_patients: dict[str, int] = collections.defaultdict(int)
for idx, patient in enumerate(patients[:2]):
matched, pid = link.link_record_against_mpi(patient, session, basic_algorithm)
for patient in patients[:2]:
matched, pid, _patient_reference_id = link.link_record_against_mpi(patient, session, basic_algorithm)
matches.append(matched)
mapped_patients[pid] += 1

Expand All @@ -130,7 +130,7 @@ def test_basic_match_two(self, session, basic_algorithm, patients):
matches: list[bool] = []
mapped_patients: dict[str, int] = collections.defaultdict(int)
for patient in patients:
matched, pid = link.link_record_against_mpi(patient, session, basic_algorithm)
matched, pid, _patient_reference_id = link.link_record_against_mpi(patient, session, basic_algorithm)
matches.append(matched)
mapped_patients[pid] += 1

Expand All @@ -146,16 +146,16 @@ def test_basic_match_two(self, session, basic_algorithm, patients):
assert matches == [False, True, False, True, False, False]
assert sorted(list(mapped_patients.values())) == [1, 1, 1, 3]

def test_enhanced_match_three(self, session, enhanced_algorithm, patients):
def test_enhanced_match_three(self, session, enhanced_algorithm, patients: list[schemas.PIIRecord]):
# add an additional patient that will fuzzy match to patient 0
patient0_copy = copy.deepcopy(patients[0])
patient0_copy["id"] = str(uuid.uuid4())
patient0_copy["name"][0]["given"][0] = "Jhon"
patient0_copy.external_id = str(uuid.uuid4())
patient0_copy.name[0].given[0] = "Jhon"
patients.append(patient0_copy)
matches: list[bool] = []
mapped_patients: dict[str, int] = collections.defaultdict(int)
for patient in patients:
matched, pid = link.link_record_against_mpi(patient, session, enhanced_algorithm)
matched, pid, _patient_reference_id = link.link_record_against_mpi(patient, session, enhanced_algorithm)
matches.append(matched)
mapped_patients[pid] += 1

Expand Down
4 changes: 2 additions & 2 deletions tests/unit/linking/test_mpi_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_no_person(self, session):
assert patient.data["name"] == [{"given": ["Johnathon", "Bill",], "family": "Smith"}]
assert patient.external_person_id is None
assert patient.external_person_source is None
assert patient.person.internal_id is not None
assert patient.person.reference_id is not None
assert patient.person.id == patient.person_id
assert len(patient.blocking_values) == 4

Expand All @@ -64,7 +64,7 @@ def test_no_person_with_external_id(self, session):
assert patient.data["name"] == [{"given": ["Johnathon",], "family": "Smith"}]
assert patient.external_person_id == "123456"
assert patient.external_person_source == "IRIS"
assert patient.person.internal_id is not None
assert patient.person.reference_id is not None
assert patient.person.id is not None
assert patient.person.id == patient.person_id
assert len(patient.blocking_values) == 3
Expand Down
Loading

0 comments on commit 327d2f8

Please sign in to comment.