Skip to content

Commit

Permalink
TN-3291 implement caching of research data, for performance improveme…
Browse files Browse the repository at this point in the history
…nt (#4382)

Reports from the field - the generation of research_data has become
unreliable due to time it takes to build report.

Now retaining a table of cached values, one row per questionnaire
response, for a responsive report generation.

Primary concern will be with cache-invalidation, especially on an org
change of research protocol or a user's consent date change.

There is now a scheduled job that runs daily, to seek out any overlooked
QNRs missing from this new cache table `research_data`. A dry run of the
prod db:
```
"found 92748 questionnaire responses missing from research_data cache"
[...]
""Task portal.tasks.cache_research_data_task[f87d7130-d2b2-4413-a461-61cf3ebc9594] succeeded in 32897.385058208994s"
```
thereafter, it will generally find zero, as we immediately update on QNR
put/post.
  • Loading branch information
pbugni authored Sep 5, 2024
1 parent 28cc351 commit c17c6ff
Show file tree
Hide file tree
Showing 13 changed files with 336 additions and 149 deletions.
8 changes: 4 additions & 4 deletions manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,8 +756,8 @@ def find_overlaps(correct_overlaps, reprocess_qnrs):
acting_user_id=admin.id,
)

update_users_QBT(
patient.id, research_study_id=0, invalidate_existing=True)
invalidate_users_QBT(user_id=patient.id, research_study_id=0)
update_users_QBT(user_id=patient.id, research_study_id=0)
present_before_after_state(
patient.id, patient.external_study_id, b4)

Expand Down Expand Up @@ -838,8 +838,8 @@ def preview_site_update(org_id, retired):
research_study_id=0,
acting_user_id=admin.id,
)
update_users_QBT(
patient.id, research_study_id=0, invalidate_existing=True)
invalidate_users_QBT(user_id=patient.id, research_study_id=0)
update_users_QBT(user_id=patient.id, research_study_id=0)
after_qnrs, after_timeline, qnrs_lost_reference, _ = present_before_after_state(
patient.id, patient.external_study_id, patient_state[patient.id])
total_qnrs += len(patient_state[patient.id]['qnrs'])
Expand Down
9 changes: 9 additions & 0 deletions portal/config/eproms/ScheduledJob.json
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,15 @@
"resourceType": "ScheduledJob",
"schedule": "30 14 * * *",
"task": "cache_adherence_data_task"
},
{
"active": true,
"args": null,
"kwargs": null,
"name": "Cache Research Report Data",
"resourceType": "ScheduledJob",
"schedule": "30 23 * * *",
"task": "cache_research_data_task"
}
],
"id": "SitePersistence v0.2",
Expand Down
57 changes: 57 additions & 0 deletions portal/migrations/versions/daee63f50d35_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""Add research_data table, to hold questionnaire response research data in a cache
Revision ID: daee63f50d35
Revises: cf586ed4f043
Create Date: 2024-05-21 17:00:58.204998
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = 'daee63f50d35'
down_revision = '6120fcfc474a'


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
'research_data',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('subject_id', sa.Integer(), nullable=False),
sa.Column('questionnaire_response_id', sa.Integer(), nullable=False),
sa.Column('instrument', sa.Text(), nullable=False),
sa.Column('research_study_id', sa.Integer(), nullable=False),
sa.Column('authored', sa.DateTime(), nullable=False),
sa.Column('data', postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.ForeignKeyConstraint(['subject_id'], ['users.id'], ),
sa.ForeignKeyConstraint(
['questionnaire_response_id'], ['questionnaire_responses.id'], ),
sa.PrimaryKeyConstraint('id'),
)

op.create_index(
op.f('ix_research_data_authored'), 'research_data', ['authored'], unique=False)
op.create_index(
op.f('ix_research_data_instrument'), 'research_data', ['instrument'], unique=False)
op.create_index(
op.f('ix_research_data_subject_id'), 'research_data', ['subject_id'], unique=False)
op.create_index(
op.f('ix_research_data_questionnaire_response_id'),
'research_data', ['questionnaire_response_id'], unique=True)
op.create_index(
op.f('ix_research_data_research_study_id'),
'research_data', ['research_study_id'], unique=False)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_research_data_research_study_id'), table_name='research_data')
op.drop_index(op.f('ix_research_data_questionnaire_response_id'), table_name='research_data')
op.drop_index(op.f('ix_research_data_subject_id'), table_name='research_data')
op.drop_index(op.f('ix_research_data_instrument'), table_name='research_data')
op.drop_index(op.f('ix_research_data_authored'), table_name='research_data')
op.drop_table('research_data')
# ### end Alembic commands ###
34 changes: 14 additions & 20 deletions portal/models/qb_timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
visit_name,
)
from .questionnaire_response import QNR_results, QuestionnaireResponse
from .research_data import ResearchData
from .research_protocol import ResearchProtocol
from .role import ROLE
from .user import User
Expand Down Expand Up @@ -741,17 +742,23 @@ def ordered_qbs(user, research_study_id, classification=None):


def invalidate_users_QBT(user_id, research_study_id):
"""Mark the given user's QBT rows and adherence_data invalid (by deletion)
"""invalidate the given user's QBT rows and related cached data, by deletion
This also clears a users cached adherence and research data rows from their
respective caches.
:param user_id: user for whom to purge all QBT rows
:param research_study_id: set to limit invalidation to research study or
use string 'all' to invalidate all QBT rows for a user
"""
if research_study_id is None:
raise ValueError('research_study_id must be defined or use "all"')
if research_study_id == 'all':
QBT.query.filter(QBT.user_id == user_id).delete()
AdherenceData.query.filter(
AdherenceData.patient_id == user_id).delete()
ResearchData.query.filter(ResearchData.subject_id == user_id).delete()
else:
QBT.query.filter(QBT.user_id == user_id).filter(
QBT.research_study_id == research_study_id).delete()
Expand All @@ -761,6 +768,8 @@ def invalidate_users_QBT(user_id, research_study_id):
# SQL alchemy can't combine `like` expression with delete op.
for ad in adh_data:
db.session.delete(ad)
ResearchData.query.filter(ResearchData.subject_id == user_id).filter(
ResearchData.research_study_id == research_study_id).delete()

if not current_app.config.get("TESTING", False):
# clear the timeout lock as well, since we need a refresh
Expand All @@ -773,6 +782,7 @@ def invalidate_users_QBT(user_id, research_study_id):
cache_moderation.reset()


# clear cached qb_status_visit_name() using current as_of value
# args have to match order and values - no wild carding avail
as_of = QB_StatusCacheKey().current()
if research_study_id != 'all':
Expand Down Expand Up @@ -863,18 +873,17 @@ def int_or_none(value):
return True


def update_users_QBT(user_id, research_study_id, invalidate_existing=False):
def update_users_QBT(user_id, research_study_id):
"""Populate the QBT rows for given user, research_study
:param user: the user to add QBT rows for
:param research_study_id: the research study being processed
:param invalidate_existing: set true to wipe any current rows first
A user may be eligible for any number of research studies. QBT treats
each (user, research_study) independently, as should clients.
"""
def attempt_update(user_id, research_study_id, invalidate_existing):
def attempt_update(user_id, research_study_id):
"""Updates user's QBT or raises if lock is unattainable"""
from .qb_status import patient_research_study_status
from ..tasks import LOW_PRIORITY, cache_single_patient_adherence_data
Expand All @@ -886,18 +895,6 @@ def attempt_update(user_id, research_study_id, invalidate_existing):
user_id, research_study_id)

with TimeoutLock(key=key, timeout=timeout):
if invalidate_existing:
QBT.query.filter(QBT.user_id == user_id).filter(
QBT.research_study_id == research_study_id).delete()
adh_data = AdherenceData.query.filter(
AdherenceData.patient_id == user_id).filter(
AdherenceData.rs_id_visit.like(f"{research_study_id}:%")
)
# SQL alchemy can't combine `like` expression with delete op.
for ad in adh_data:
db.session.delete(ad)
db.session.commit()

# if any rows are found, assume this user/study is current
if QBT.query.filter(QBT.user_id == user_id).filter(
QBT.research_study_id == research_study_id).count():
Expand Down Expand Up @@ -1206,10 +1203,7 @@ def attempt_update(user_id, research_study_id, invalidate_existing):
success = False
for attempt in range(1, 6):
try:
attempt_update(
user_id=user_id,
research_study_id=research_study_id,
invalidate_existing=invalidate_existing)
attempt_update(user_id=user_id, research_study_id=research_study_id)
success = True
break
except ConnectionError as ce:
Expand Down
105 changes: 26 additions & 79 deletions portal/models/questionnaire_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from flask import current_app, has_request_context, url_for
from flask_swagger import swagger
import jsonschema
from sqlalchemy import or_
from sqlalchemy.dialects.postgresql import ENUM, JSONB
from sqlalchemy.orm.exc import MultipleResultsFound

Expand All @@ -31,8 +30,8 @@
trigger_date,
visit_name,
)
from .research_data import ResearchData
from .research_study import EMPRO_RS_ID, research_study_id_from_questionnaire
from .reference import Reference
from .user import User, current_user, patients_query
from .user_consent import consent_withdrawal_dates

Expand Down Expand Up @@ -356,8 +355,7 @@ def quote_double_quote(value):
questionnaire_map = questionnaire.questionnaire_code_map()

for question in document.get('group', {}).get('question', ()):

combined_answers = consolidate_answer_pairs(question['answer'])
combined_answers = consolidate_answer_pairs(question.get('answer', ()))

# Separate out text and coded answer, then override text
text_and_coded_answers = []
Expand All @@ -372,7 +370,8 @@ def quote_double_quote(value):
answer['valueCoding'].get('text')
)

text_and_coded_answers.append({'valueString': text_answer})
if text_answer is not None:
text_and_coded_answers.append({'valueString': text_answer})
elif 'valueString' in answer and '"' in answer['valueString']:
answer['valueString'] = quote_double_quote(answer['valueString'])

Expand Down Expand Up @@ -844,15 +843,14 @@ def required_qs(self, qb_id):

def aggregate_responses(
instrument_ids, current_user, research_study_id, patch_dstu2=False,
ignore_qb_requirement=False, celery_task=None, patient_ids=None):
celery_task=None, patient_ids=None):
"""Build a bundle of QuestionnaireResponses
:param instrument_ids: list of instrument_ids to restrict results to
:param current_user: user making request, necessary to restrict results
to list of patients the current_user has permission to see
:param research_study_id: study being processed
:param patch_dstu2: set to make bundle DSTU2 compliant
:param ignore_qb_requirement: set to include all questionnaire responses
:param celery_task: if defined, send occasional progress updates
:param patient_ids: if defined, limit result set to given patient list
Expand All @@ -861,89 +859,38 @@ def aggregate_responses(
"""
from .qb_timeline import qb_status_visit_name # avoid cycle

if celery_task:
celery_task.update_state(
state='PROGRESS',
meta={'current': 1, 'total': 100})

# Gather up the patient IDs for whom current user has 'view' permission
user_ids = patients_query(
current_user,
include_test_role=False,
filter_by_ids=patient_ids,
).with_entities(User.id)

annotated_questionnaire_responses = []
questionnaire_responses = QuestionnaireResponse.query.filter(
QuestionnaireResponse.subject_id.in_(user_ids)).order_by(
QuestionnaireResponse.document['authored'].desc())

# TN-3250, don't include QNRs without assigned visits, i.e. qb_id > 0
if not ignore_qb_requirement:
questionnaire_responses = questionnaire_responses.filter(
QuestionnaireResponse.questionnaire_bank_id > 0)

if instrument_ids:
instrument_filters = (
QuestionnaireResponse.document[
("questionnaire", "reference")
].astext.endswith(instrument_id)
for instrument_id in instrument_ids
)
questionnaire_responses = questionnaire_responses.filter(
or_(*instrument_filters))

patient_fields = ("careProvider", "identifier")
system_filter = current_app.config.get('REPORTING_IDENTIFIER_SYSTEMS')
if celery_task:
current, total = 0, questionnaire_responses.count()

for questionnaire_response in questionnaire_responses:
document = questionnaire_response.document_answered.copy()
subject = questionnaire_response.subject
encounter = questionnaire_response.encounter
encounter_fhir = encounter.as_fhir()
document["encounter"] = encounter_fhir
celery_task.update_state(
state='PROGRESS',
meta={'current': 10, 'total': 100})

document["subject"] = {
k: v for k, v in subject.as_fhir().items() if k in patient_fields
}
query = ResearchData.query.filter(
ResearchData.subject_id.in_(user_ids)).order_by(
ResearchData.authored.desc(), ResearchData.subject_id).with_entities(
ResearchData.data)

if subject.organizations:
providers = []
for org in subject.organizations:
org_ref = Reference.organization(org.id).as_fhir()
identifiers = [i.as_fhir() for i in org.identifiers if
i.system in system_filter]
if identifiers:
org_ref['identifier'] = identifiers
providers.append(org_ref)
document["subject"]["careProvider"] = providers

qb_status = qb_status_visit_name(
subject.id,
research_study_id,
FHIR_datetime.parse(questionnaire_response.document['authored']))
document["timepoint"] = qb_status['visit_name']

# Hack: add missing "resource" wrapper for DTSU2 compliance
# Remove when all interventions compliant
if patch_dstu2:
document = {
'resource': document,
# Todo: return URL to individual QuestionnaireResponse resource
'fullUrl': url_for(
'assessment_engine_api.assessment',
patient_id=subject.id,
_external=True,
),
}

annotated_questionnaire_responses.append(document)

if celery_task:
current += 1
if current % 25 == 0:
celery_task.update_state(
state='PROGRESS',
meta={'current': current, 'total': total})

return bundle_results(elements=annotated_questionnaire_responses)
if instrument_ids:
query = query.filter(ResearchData.instrument.in_(tuple(instrument_ids)))
if celery_task:
# the delay is now a single big query, and then bundling - mark near done
celery_task.update_state(
state='PROGRESS',
meta={'current': 70, 'total': 100})
elements = [i.data for i in query.all()]
return bundle_results(elements=elements)


def qnr_document_id(
Expand Down
5 changes: 1 addition & 4 deletions portal/models/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,7 @@ def patient_generator():

def research_report(
instrument_ids, research_study_id, acting_user_id, patch_dstu2,
request_url, response_format, lock_key, ignore_qb_requirement,
celery_task):
request_url, response_format, lock_key, celery_task):
"""Generates the research report
Designed to be executed in a background task - all inputs and outputs are
Expand All @@ -479,7 +478,6 @@ def research_report(
:param request_url: original request url, for inclusion in FHIR bundle
:param response_format: 'json' or 'csv'
:param lock_key: name of TimeoutLock key used to throttle requests
:param ignore_qb_requirement: Set to include all questionnaire responses
:param celery_task: used to update status when run as a celery task
:return: dictionary of results, easily stored as a task output, including
any details needed to assist the view method
Expand All @@ -494,7 +492,6 @@ def research_report(
research_study_id=research_study_id,
current_user=acting_user,
patch_dstu2=patch_dstu2,
ignore_qb_requirement=ignore_qb_requirement,
celery_task=celery_task
)
bundle.update({
Expand Down
Loading

0 comments on commit c17c6ff

Please sign in to comment.