Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate daily rounds data to observations #2679

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions care/emr/migrations/0033_migrate_daily_rounds_to_obseravations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# Generated by Django 5.1.1 on 2024-12-11 08:29

from django.db import migrations
from django.core.paginator import Paginator
from django.db.models import F
from pathlib import Path
import json

migration_id = 31

quantity_value_types = {"decimal", "integer", "quantity"}


def get_nested_value(obj, key):
# jq
keys = key.split(".")
value = getattr(obj, keys[0], None)
for key in keys[1:]:
if isinstance(value, dict):
value = value.get(key, None)
else:
return None
return value

def get_observations(field_mapping, daily_round):
observations = []

if not daily_round.created_by_id:
# bad data in staging
print(f"Daily round {daily_round.id} has missing values {daily_round.created_by_id=}")
return observations

default_observation = {
"status": "final",
"subject_type": "patient",
"subject_id": daily_round.patient_external_id,
"patient_id": daily_round.patient_id,
"encounter_id": daily_round.consultation_id,
"effective_datetime": daily_round.taken_at or daily_round.created_date,
"created_date": daily_round.created_date,
"data_entered_by_id": daily_round.created_by_id,
"performer": {},
"meta": {
"is_migration": True,
"migration_id": migration_id,
"orignal_model": "DailyRound",
"orignal_model_id": daily_round.id,
"created_by_telemedicine": daily_round.created_by_telemedicine,
"round_type": daily_round.rounds_type,
"in_prone_position": daily_round.in_prone_position,
}
}

# set nested json values as attributes on the object so that we can access them easily
for array_field in ("output", "nursing", "pressure_sore"):
for item in getattr(daily_round, array_field, []):
for key, value in item.items():
# TODO: check if we need deduplication
setattr(daily_round, f"{array_field}__{key.lower()}", value)

for field in field_mapping:
try:
daily_round_value = None
daily_round_note = None

#TODO: instead of always using the helper we can ask it explicitly from the field mapping data
if field["value_key"]:
daily_round_value = get_nested_value(daily_round, field["value_key"])
if field["notes_key"]:
daily_round_note = get_nested_value(daily_round, field["value_key"])

if daily_round_value or daily_round_note:
value = {}
if daily_round_value:
value["value"] = str(daily_round_value)

if field["value_map"]:
value["value_code"] = field["value_map"].get(daily_round_value, {})

if field["value_type"] in quantity_value_types:
value["value_quantity"] = {
"value": float(daily_round_value),
}
if field["unit_code"]:
value["value_quantity"]["unit"] = field["unit_code"]
# if field["quantity_code"]:
# TODO: maybe this needs to be derived from the value?
# value["value_quantity"]["code"] = field["quantity_code"]

observations.append(dict(
**default_observation,
category = field["category_code"],
main_code = field["main_code"],
alternate_coding = field["alternate_coding"],
value_type = field["value_type"],
value = value,
body_site = field["body_site"],
method = field["method"],
))
except Exception as e:
print(f"Error while processing {field['value_key']} for {daily_round.id=}: {e}")
return observations


def migrate_daily_rounds_to_observation(apps, schema_editor):
DailyRound = apps.get_model("facility", "DailyRound")
Observation = apps.get_model("emr", "Observation")

#load filed mapping from json
field_mapping_file = Path(__file__).parent / "daily_round_to_observations_field_mapping.json"
with field_mapping_file.open() as f:
field_mapping = json.load(f)

queryset = DailyRound.objects.all().order_by("id").annotate(
patient_id=F("consultation__patient_id"),
patient_external_id=F("consultation__patient__external_id"),
)
paginator = Paginator(queryset, 3000) # TODO: come up with a better batch size
for page_num in paginator.page_range:
bulk_observations = []
daily_rounds = paginator.get_page(page_num)
for daily_round in daily_rounds:
bulk_observations.extend(get_observations(field_mapping, daily_round))
Observation.objects.bulk_create(
[Observation(**observation) for observation in bulk_observations],
)


def reverse_migration(apps, schema_editor):
# drop all observations created by this migration
schema_editor.execute("DELETE FROM emr_observation WHERE meta->>'migration_id' = %s", [str(migration_id)])


class Migration(migrations.Migration):

dependencies = [
('emr', '0032_alter_medicationstatement_information_source'),
]

operations = [
migrations.RunPython(migrate_daily_rounds_to_observation,
reverse_code=reverse_migration),
]
Loading
Loading