From cf78358cc87989c10fae2c11146bd6ed9e6e1a9e Mon Sep 17 00:00:00 2001 From: Yanyu Zheng Date: Fri, 10 Sep 2021 10:17:13 -0400 Subject: [PATCH 1/4] fix: group export _since --- bulkExport/glueScripts/export-script.py | 29 +++++++++++------------ integration-tests/bulkExportTestHelper.ts | 7 ++++++ 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/bulkExport/glueScripts/export-script.py b/bulkExport/glueScripts/export-script.py index f7dd4cbf..9064343c 100644 --- a/bulkExport/glueScripts/export-script.py +++ b/bulkExport/glueScripts/export-script.py @@ -89,17 +89,6 @@ def remove_composite_id(resource): filtered_tenant_id_frame = Map.apply(frame = filtered_tenant_id_frame_with_composite_id, f = remove_composite_id) -print('Start filtering by transactionTime and Since') -# Filter by transactionTime and Since -datetime_since = datetime.strptime(since, "%Y-%m-%dT%H:%M:%S.%fZ") -datetime_transaction_time = datetime.strptime(transaction_time, "%Y-%m-%dT%H:%M:%S.%fZ") - -filtered_dates_dyn_frame = Filter.apply(frame = filtered_tenant_id_frame, - f = lambda x: - datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") > datetime_since and - datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") <= datetime_transaction_time - ) - print ('start filtering by group_id') def is_active_group_member(member, datetime_transaction_time): if getattr(member, 'inactive', None) == True: @@ -140,8 +129,10 @@ def is_included_in_group_export(resource, group_member_ids, group_patient_ids, c return True return False +datetime_transaction_time = datetime.strptime(transaction_time, "%Y-%m-%dT%H:%M:%S.%fZ") + if (group_id is None): - filtered_group_frame = filtered_dates_dyn_frame + filtered_group_frame = filtered_tenant_id_frame else: print('Loading patient compartment search params') client = boto3.client('s3') @@ -150,7 +141,7 @@ def is_included_in_group_export(resource, group_member_ids, group_patient_ids, c compartment_search_params = json.load(s3Obj['Body']) print('Extract group member ids') - group_members = Filter.apply(frame = filtered_dates_dyn_frame, f = lambda x: x['id'] == group_id).toDF().collect()[0]['member'] + group_members = Filter.apply(frame = filtered_tenant_id_frame, f = lambda x: x['id'] == group_id).toDF().collect()[0]['member'] active_group_member_references = [x['entity']['reference'] for x in group_members if is_active_group_member(x, datetime_transaction_time) and is_internal_reference(x['entity']['reference'], server_url)] group_member_ids = set([x.split('/')[-1] for x in active_group_member_references]) group_patient_ids = set([x.split('/')[-1] for x in active_group_member_references if x.split('/')[-2] == 'Patient']) @@ -158,14 +149,22 @@ def is_included_in_group_export(resource, group_member_ids, group_patient_ids, c print(group_patient_ids) print('Extract group member and patient compartment dataframe') - filtered_group_frame = Filter.apply(frame = filtered_dates_dyn_frame, f = lambda x: is_included_in_group_export(x, group_member_ids, group_patient_ids, compartment_search_params, server_url)) + filtered_group_frame = Filter.apply(frame = filtered_tenant_id_frame, f = lambda x: is_included_in_group_export(x, group_member_ids, group_patient_ids, compartment_search_params, server_url)) +print('Start filtering by transactionTime and Since') +# Filter by transactionTime and Since +datetime_since = datetime.strptime(since, "%Y-%m-%dT%H:%M:%S.%fZ") +filtered_dates_dyn_frame = Filter.apply(frame = filtered_group_frame, + f = lambda x: + datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") > datetime_since and + datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") <= datetime_transaction_time + ) print('Start filtering by documentStatus and resourceType') # Filter by resource listed in Type and with correct STATUS type_list = None if type == None else set(type.split(',')) valid_document_state_to_be_read_from = {'AVAILABLE','LOCKED', 'PENDING_DELETE'} -filtered_dates_resource_dyn_frame = Filter.apply(frame = filtered_group_frame, +filtered_dates_resource_dyn_frame = Filter.apply(frame = filtered_dates_dyn_frame, f = lambda x: x["documentStatus"] in valid_document_state_to_be_read_from if type_list is None else x["documentStatus"] in valid_document_state_to_be_read_from and x["resourceType"] in type_list diff --git a/integration-tests/bulkExportTestHelper.ts b/integration-tests/bulkExportTestHelper.ts index 394f72d0..d31771b4 100644 --- a/integration-tests/bulkExportTestHelper.ts +++ b/integration-tests/bulkExportTestHelper.ts @@ -149,6 +149,13 @@ export default class BulkExportTestHelper { } } + async updateResource(resource: any) { + const resourceToUpdate = cloneDeep(resource); + delete resourceToUpdate.meta; + const response = await this.fhirUserAxios.put(`/${resource.resourceType}/${resource.id}`, resourceToUpdate); + return response.data; + } + // This method does not require FHIR user credentials in the header because the url is an S3 presigned URL static async downloadFile(url: string): Promise { try { From d8c2e9f6658260df12b72b459f988edf35db78be Mon Sep 17 00:00:00 2001 From: Robert Smayda Date: Tue, 21 Sep 2021 13:57:49 -0400 Subject: [PATCH 2/4] fix: handle when failures happen in bulk export (#452) --- bulkExport/state-machine-definition.yaml | 57 ++++++++++++------------ 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/bulkExport/state-machine-definition.yaml b/bulkExport/state-machine-definition.yaml index 9256c92b..386ee0b7 100644 --- a/bulkExport/state-machine-definition.yaml +++ b/bulkExport/state-machine-definition.yaml @@ -5,51 +5,52 @@ id: BulkExportStateMachine definition: - Comment: "State machine that executes a FHIR bulk export job" + Comment: 'State machine that executes a FHIR bulk export job' StartAt: parallelHelper States: catchAllUpdateStatusToFailed: Type: Task Resource: !GetAtt updateStatus.Arn - Parameters: {"jobId.$":"$.jobId", "status": "failed"} + Parameters: { 'globalParams.$': '$', 'status': 'failed' } Retry: - - ErrorEquals: [ "States.ALL" ] + - ErrorEquals: ['States.ALL'] End: true parallelHelper: Type: Parallel End: true Catch: - - ErrorEquals: [ "States.ALL" ] + - ErrorEquals: ['States.ALL'] Next: catchAllUpdateStatusToFailed + ResultPath: '$.error' Branches: - StartAt: startExportJob States: updateStatusToFailed: Type: Task Resource: !GetAtt updateStatus.Arn - Parameters: {"globalParams.$":"$", "status": "failed"} + Parameters: { 'globalParams.$': '$', 'status': 'failed' } Retry: - - ErrorEquals: [ "States.ALL" ] + - ErrorEquals: ['States.ALL'] End: true updateStatusToCanceled: Type: Task Resource: !GetAtt updateStatus.Arn - Parameters: {"globalParams.$":"$", "status": "canceled"} + Parameters: { 'globalParams.$': '$', 'status': 'canceled' } Retry: - - ErrorEquals: [ "States.ALL" ] + - ErrorEquals: ['States.ALL'] End: true updateStatusToCompleted: Type: Task Resource: !GetAtt updateStatus.Arn - Parameters: {"globalParams.$":"$", "status": "completed"} + Parameters: { 'globalParams.$': '$', 'status': 'completed' } Retry: - - ErrorEquals: [ "States.ALL" ] + - ErrorEquals: ['States.ALL'] End: true startExportJob: Type: Task Resource: !GetAtt startExportJob.Arn Retry: - - ErrorEquals: [ "States.ALL" ] + - ErrorEquals: ['States.ALL'] Next: waitForExportJob waitForExportJob: Type: Wait @@ -59,37 +60,37 @@ definition: Type: Task Resource: !GetAtt getJobStatus.Arn Retry: - - ErrorEquals: [ "States.ALL" ] + - ErrorEquals: ['States.ALL'] Next: choiceOnJobStatus choiceOnJobStatus: Type: Choice Choices: - - Variable: "$.executionParameters.isCanceled" + - Variable: '$.executionParameters.isCanceled' BooleanEquals: true Next: stopExportJob - - Variable: "$.executionParameters.glueJobRunStatus" + - Variable: '$.executionParameters.glueJobRunStatus' StringEquals: 'SUCCEEDED' Next: updateStatusToCompleted - Or: - - Variable: "$.executionParameters.glueJobRunStatus" - StringEquals: 'STARTING' - - Variable: "$.executionParameters.glueJobRunStatus" - StringEquals: 'RUNNING' + - Variable: '$.executionParameters.glueJobRunStatus' + StringEquals: 'STARTING' + - Variable: '$.executionParameters.glueJobRunStatus' + StringEquals: 'RUNNING' Next: waitForExportJob - Or: - - Variable: "$.executionParameters.glueJobRunStatus" - StringEquals: 'FAILED' - - Variable: "$.executionParameters.glueJobRunStatus" - StringEquals: 'TIMEOUT' - - Variable: "$.executionParameters.glueJobRunStatus" - # STOPPING and STOPPED can only occur here if the job was forcefully stopped with a Glue API call from outside the FHIR server, so we treat it as failure - StringEquals: 'STOPPING' - - Variable: "$.executionParameters.glueJobRunStatus" - StringEquals: 'STOPPED' + - Variable: '$.executionParameters.glueJobRunStatus' + StringEquals: 'FAILED' + - Variable: '$.executionParameters.glueJobRunStatus' + StringEquals: 'TIMEOUT' + - Variable: '$.executionParameters.glueJobRunStatus' + # STOPPING and STOPPED can only occur here if the job was forcefully stopped with a Glue API call from outside the FHIR server, so we treat it as failure + StringEquals: 'STOPPING' + - Variable: '$.executionParameters.glueJobRunStatus' + StringEquals: 'STOPPED' Next: updateStatusToFailed stopExportJob: Type: Task Resource: !GetAtt stopExportJob.Arn Retry: - - ErrorEquals: [ "States.ALL" ] + - ErrorEquals: ['States.ALL'] Next: updateStatusToCanceled From 30a76eb3be32f0186071f0cbe7fc6d7fb94b0349 Mon Sep 17 00:00:00 2001 From: Yanyu Zheng Date: Thu, 23 Sep 2021 13:23:55 -0400 Subject: [PATCH 3/4] fix: Patient compartment array inclusion in group export (#455) --- bulkExport/glueScripts/export-script.py | 24 ++++++++++------- integration-tests/bulkExportTestHelper.ts | 6 ++--- .../createGroupMembersBundle.json | 27 +++++++++++++++++++ 3 files changed, 44 insertions(+), 13 deletions(-) diff --git a/bulkExport/glueScripts/export-script.py b/bulkExport/glueScripts/export-script.py index 9064343c..3c74ffdc 100644 --- a/bulkExport/glueScripts/export-script.py +++ b/bulkExport/glueScripts/export-script.py @@ -109,11 +109,12 @@ def is_internal_reference(reference, server_url): return False def deep_get(resource, path): - if resource is None: - return None - if len(path) is 1: - return resource[path[0]]['reference'] - return deep_get(resource[path[0]], path.pop(0)) + temp = resource + for p in path: + if temp is None: + return None + temp = temp[p] + return temp def is_included_in_group_export(resource, group_member_ids, group_patient_ids, compartment_search_params, server_url): # Check if resource is part of the group @@ -122,11 +123,16 @@ def is_included_in_group_export(resource, group_member_ids, group_patient_ids, c # Check if resource is part of the patient compartment if resource['resourceType'] in compartment_search_params: # Get inclusion criteria paths for the resource - inclusion_paths = compartment_search_params[resource.resourceType] + inclusion_paths = compartment_search_params[resource['resourceType']] for path in inclusion_paths: reference = deep_get(resource, path.split(".")) - if is_internal_reference(reference, server_url) and reference.split('/')[-1] in group_patient_ids: - return True + if isinstance(reference, dict): + reference = [reference] + elif not isinstance(reference, list): + return False # Inclusion criteria should point to a dict {reference: 'Patient/1234'} or a list of references + for ref in reference: + if is_internal_reference(ref['reference'], server_url) and ref['reference'].split('/')[-1] in group_patient_ids: + return True return False datetime_transaction_time = datetime.strptime(transaction_time, "%Y-%m-%dT%H:%M:%S.%fZ") @@ -141,7 +147,7 @@ def is_included_in_group_export(resource, group_member_ids, group_patient_ids, c compartment_search_params = json.load(s3Obj['Body']) print('Extract group member ids') - group_members = Filter.apply(frame = filtered_tenant_id_frame, f = lambda x: x['id'] == group_id).toDF().collect()[0]['member'] + group_members = Filter.apply(frame = filtered_tenant_id_frame, f = lambda x: x['id'] == group_id).toDF().sort("meta.versionId").collect()[-1]['member'] active_group_member_references = [x['entity']['reference'] for x in group_members if is_active_group_member(x, datetime_transaction_time) and is_internal_reference(x['entity']['reference'], server_url)] group_member_ids = set([x.split('/')[-1] for x in active_group_member_references]) group_patient_ids = set([x.split('/')[-1] for x in active_group_member_references if x.split('/')[-2] == 'Patient']) diff --git a/integration-tests/bulkExportTestHelper.ts b/integration-tests/bulkExportTestHelper.ts index d31771b4..b77c6f68 100644 --- a/integration-tests/bulkExportTestHelper.ts +++ b/integration-tests/bulkExportTestHelper.ts @@ -196,10 +196,8 @@ export default class BulkExportTestHelper { if (swapBundleInternalReference) { let resourcesString = JSON.stringify(resources); urlToReferenceList.forEach(item => { - resourcesString = resourcesString.replace( - `"reference":"${item.url}"`, - `"reference":"${item.reference}"`, - ); + const regEx = new RegExp(`"reference":"${item.url}"`, 'g'); + resourcesString = resourcesString.replace(regEx, `"reference":"${item.reference}"`); }); resources = JSON.parse(resourcesString); } diff --git a/integration-tests/createGroupMembersBundle.json b/integration-tests/createGroupMembersBundle.json index 619f7a57..3e54253e 100644 --- a/integration-tests/createGroupMembersBundle.json +++ b/integration-tests/createGroupMembersBundle.json @@ -83,6 +83,33 @@ "method": "POST", "url": "Encounter" } + }, + { + "fullUrl": "urn:uuid:6ad9a6b5-44fb-4eae-9544-a36d5c05c780", + "resource": { + "resourceType": "Provenance", + "target": [ + { + "reference": "Procedure/example/_history/1" + }, + { + "reference": "urn:uuid:fcfe413c-c62d-4097-9e31-02ff6ff523ad" + } + ], + "recorded": "2015-06-27T08:39:24+10:00", + "agent": [ + { + "who": { + "reference": "Practitioner/xcda-author" + } + } + ] + }, + "request": + { + "method": "POST", + "url": "Provenance" + } } ] } \ No newline at end of file From 1619ef98209b9843cb446b2bc044cddba6a86d5b Mon Sep 17 00:00:00 2001 From: zheyanyu Date: Mon, 27 Sep 2021 14:20:53 -0400 Subject: [PATCH 4/4] test: add test that had merge conflict --- integration-tests/bulkExport.test.ts | 31 ++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/integration-tests/bulkExport.test.ts b/integration-tests/bulkExport.test.ts index c301acb7..1dc50b93 100644 --- a/integration-tests/bulkExport.test.ts +++ b/integration-tests/bulkExport.test.ts @@ -109,6 +109,37 @@ describe('Bulk Export', () => { ); }); + test('Successfully export group members last updated after _since timestamp in a group last updated before the _since timestamp', async () => { + // BUILD + const createdResourceBundleResponse = await bulkExportTestHelper.sendCreateGroupRequest(); + const resTypToResExpectedInExport = bulkExportTestHelper.getResources( + createdResourceBundleResponse, + createGroupMembersBundle, + true, + ); + // sleep 10 seconds to make tests more resilient to clock skew when running locally. + await sleep(10_000); + const currentTime = new Date(); + // Update patient resource so the last updated time is after currentTime + const updatedPatientResource = await bulkExportTestHelper.updateResource( + resTypToResExpectedInExport.Patient, + ); + + // OPERATE + const groupId = resTypToResExpectedInExport.Group.id; + const statusPollUrl = await bulkExportTestHelper.startExportJob({ + exportType: 'group', + groupId, + since: currentTime, + }); + const responseBody = await bulkExportTestHelper.getExportStatus(statusPollUrl); + + // CHECK + return bulkExportTestHelper.checkResourceInExportedFiles(responseBody.output, { + Patient: updatedPatientResource, + }); + }); + test('Does not include inactive members in group export', async () => { // BUILD const createdResourceBundleResponse = await bulkExportTestHelper.sendCreateGroupRequest({ inactive: true });