From 96f26d4563d779c083be42fc35a173bf046512cc Mon Sep 17 00:00:00 2001 From: Paul Bugni Date: Mon, 14 Oct 2024 11:51:30 -0700 Subject: [PATCH] wrap research_data row addition in critical section, keyed on unique constraint, as we do rarely experience asynchronous collisions. --- portal/models/research_data.py | 82 ++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 39 deletions(-) diff --git a/portal/models/research_data.py b/portal/models/research_data.py index 771cb4613..46cbfe95e 100644 --- a/portal/models/research_data.py +++ b/portal/models/research_data.py @@ -97,46 +97,50 @@ def add_questionnaire_response(questionnaire_response, research_study_id): """ from .qb_timeline import qb_status_visit_name - + from ..timeout_lock import TimeoutLock # TN-3250, don't include QNRs without assigned visits, i.e. qb_id > 0 if not questionnaire_response.questionnaire_bank_id: return - instrument = questionnaire_response.document['questionnaire']['reference'].split('/')[-1] - if research_study_id is None: - research_study_id = research_study_id_from_questionnaire(instrument) - - patient_fields = ("careProvider", "identifier") - document = questionnaire_response.document_answered.copy() - subject = questionnaire_response.subject - document['encounter'] = questionnaire_response.encounter.as_fhir() - document["subject"] = { - k: v for k, v in subject.as_fhir().items() if k in patient_fields - } - - if subject.organizations: - providers = [] - for org in subject.organizations: - org_ref = Reference.organization(org.id).as_fhir() - identifiers = [i.as_fhir() for i in org.identifiers if i.system == "http://pcctc.org/"] - if identifiers: - org_ref['identifier'] = identifiers - providers.append(org_ref) - document["subject"]["careProvider"] = providers - - qb_status = qb_status_visit_name( - subject.id, - research_study_id, - FHIR_datetime.parse(questionnaire_response.document['authored'])) - document["timepoint"] = qb_status['visit_name'] - - research_data = ResearchData( - subject_id=subject.id, - questionnaire_response_id=questionnaire_response.id, - instrument=instrument, - research_study_id=research_study_id, - authored=FHIR_datetime.parse(document['authored']), - data=document - ) - db.session.add(research_data) - db.session.commit() + # Asynchronous requests, look out for threads updating the same subject, QNR + key = f"add_research_data.{questionnaire_response.subject_id}:{questionnaire_response.id}" + with TimeoutLock(key, expires=60, timeout=60): + instrument = questionnaire_response.document['questionnaire']['reference'].split('/')[-1] + if research_study_id is None: + research_study_id = research_study_id_from_questionnaire(instrument) + + patient_fields = ("careProvider", "identifier") + document = questionnaire_response.document_answered.copy() + subject = questionnaire_response.subject + document['encounter'] = questionnaire_response.encounter.as_fhir() + document["subject"] = { + k: v for k, v in subject.as_fhir().items() if k in patient_fields + } + + if subject.organizations: + providers = [] + for org in subject.organizations: + org_ref = Reference.organization(org.id).as_fhir() + identifiers = [ + i.as_fhir() for i in org.identifiers if i.system == "http://pcctc.org/"] + if identifiers: + org_ref['identifier'] = identifiers + providers.append(org_ref) + document["subject"]["careProvider"] = providers + + qb_status = qb_status_visit_name( + subject.id, + research_study_id, + FHIR_datetime.parse(questionnaire_response.document['authored'])) + document["timepoint"] = qb_status['visit_name'] + + research_data = ResearchData( + subject_id=subject.id, + questionnaire_response_id=questionnaire_response.id, + instrument=instrument, + research_study_id=research_study_id, + authored=FHIR_datetime.parse(document['authored']), + data=document + ) + db.session.add(research_data) + db.session.commit()