From 4d861046e27bd22794a9e1f616c4a81dbd95fde5 Mon Sep 17 00:00:00 2001 From: Yanyu Zheng Date: Tue, 28 Sep 2021 15:40:32 -0400 Subject: [PATCH] fix: group export (#460) --- bulkExport/glueScripts/export-script.py | 51 +++++++++-------- bulkExport/state-machine-definition.yaml | 57 ++++++++++--------- integration-tests/bulkExport.test.ts | 31 ++++++++++ integration-tests/bulkExportTestHelper.ts | 13 +++-- .../createGroupMembersBundle.json | 27 +++++++++ 5 files changed, 124 insertions(+), 55 deletions(-) diff --git a/bulkExport/glueScripts/export-script.py b/bulkExport/glueScripts/export-script.py index f7dd4cbf..3c74ffdc 100644 --- a/bulkExport/glueScripts/export-script.py +++ b/bulkExport/glueScripts/export-script.py @@ -89,17 +89,6 @@ def remove_composite_id(resource): filtered_tenant_id_frame = Map.apply(frame = filtered_tenant_id_frame_with_composite_id, f = remove_composite_id) -print('Start filtering by transactionTime and Since') -# Filter by transactionTime and Since -datetime_since = datetime.strptime(since, "%Y-%m-%dT%H:%M:%S.%fZ") -datetime_transaction_time = datetime.strptime(transaction_time, "%Y-%m-%dT%H:%M:%S.%fZ") - -filtered_dates_dyn_frame = Filter.apply(frame = filtered_tenant_id_frame, - f = lambda x: - datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") > datetime_since and - datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") <= datetime_transaction_time - ) - print ('start filtering by group_id') def is_active_group_member(member, datetime_transaction_time): if getattr(member, 'inactive', None) == True: @@ -120,11 +109,12 @@ def is_internal_reference(reference, server_url): return False def deep_get(resource, path): - if resource is None: - return None - if len(path) is 1: - return resource[path[0]]['reference'] - return deep_get(resource[path[0]], path.pop(0)) + temp = resource + for p in path: + if temp is None: + return None + temp = temp[p] + return temp def is_included_in_group_export(resource, group_member_ids, group_patient_ids, compartment_search_params, server_url): # Check if resource is part of the group @@ -133,15 +123,22 @@ def is_included_in_group_export(resource, group_member_ids, group_patient_ids, c # Check if resource is part of the patient compartment if resource['resourceType'] in compartment_search_params: # Get inclusion criteria paths for the resource - inclusion_paths = compartment_search_params[resource.resourceType] + inclusion_paths = compartment_search_params[resource['resourceType']] for path in inclusion_paths: reference = deep_get(resource, path.split(".")) - if is_internal_reference(reference, server_url) and reference.split('/')[-1] in group_patient_ids: - return True + if isinstance(reference, dict): + reference = [reference] + elif not isinstance(reference, list): + return False # Inclusion criteria should point to a dict {reference: 'Patient/1234'} or a list of references + for ref in reference: + if is_internal_reference(ref['reference'], server_url) and ref['reference'].split('/')[-1] in group_patient_ids: + return True return False +datetime_transaction_time = datetime.strptime(transaction_time, "%Y-%m-%dT%H:%M:%S.%fZ") + if (group_id is None): - filtered_group_frame = filtered_dates_dyn_frame + filtered_group_frame = filtered_tenant_id_frame else: print('Loading patient compartment search params') client = boto3.client('s3') @@ -150,7 +147,7 @@ def is_included_in_group_export(resource, group_member_ids, group_patient_ids, c compartment_search_params = json.load(s3Obj['Body']) print('Extract group member ids') - group_members = Filter.apply(frame = filtered_dates_dyn_frame, f = lambda x: x['id'] == group_id).toDF().collect()[0]['member'] + group_members = Filter.apply(frame = filtered_tenant_id_frame, f = lambda x: x['id'] == group_id).toDF().sort("meta.versionId").collect()[-1]['member'] active_group_member_references = [x['entity']['reference'] for x in group_members if is_active_group_member(x, datetime_transaction_time) and is_internal_reference(x['entity']['reference'], server_url)] group_member_ids = set([x.split('/')[-1] for x in active_group_member_references]) group_patient_ids = set([x.split('/')[-1] for x in active_group_member_references if x.split('/')[-2] == 'Patient']) @@ -158,14 +155,22 @@ def is_included_in_group_export(resource, group_member_ids, group_patient_ids, c print(group_patient_ids) print('Extract group member and patient compartment dataframe') - filtered_group_frame = Filter.apply(frame = filtered_dates_dyn_frame, f = lambda x: is_included_in_group_export(x, group_member_ids, group_patient_ids, compartment_search_params, server_url)) + filtered_group_frame = Filter.apply(frame = filtered_tenant_id_frame, f = lambda x: is_included_in_group_export(x, group_member_ids, group_patient_ids, compartment_search_params, server_url)) +print('Start filtering by transactionTime and Since') +# Filter by transactionTime and Since +datetime_since = datetime.strptime(since, "%Y-%m-%dT%H:%M:%S.%fZ") +filtered_dates_dyn_frame = Filter.apply(frame = filtered_group_frame, + f = lambda x: + datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") > datetime_since and + datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") <= datetime_transaction_time + ) print('Start filtering by documentStatus and resourceType') # Filter by resource listed in Type and with correct STATUS type_list = None if type == None else set(type.split(',')) valid_document_state_to_be_read_from = {'AVAILABLE','LOCKED', 'PENDING_DELETE'} -filtered_dates_resource_dyn_frame = Filter.apply(frame = filtered_group_frame, +filtered_dates_resource_dyn_frame = Filter.apply(frame = filtered_dates_dyn_frame, f = lambda x: x["documentStatus"] in valid_document_state_to_be_read_from if type_list is None else x["documentStatus"] in valid_document_state_to_be_read_from and x["resourceType"] in type_list diff --git a/bulkExport/state-machine-definition.yaml b/bulkExport/state-machine-definition.yaml index 9256c92b..386ee0b7 100644 --- a/bulkExport/state-machine-definition.yaml +++ b/bulkExport/state-machine-definition.yaml @@ -5,51 +5,52 @@ id: BulkExportStateMachine definition: - Comment: "State machine that executes a FHIR bulk export job" + Comment: 'State machine that executes a FHIR bulk export job' StartAt: parallelHelper States: catchAllUpdateStatusToFailed: Type: Task Resource: !GetAtt updateStatus.Arn - Parameters: {"jobId.$":"$.jobId", "status": "failed"} + Parameters: { 'globalParams.$': '$', 'status': 'failed' } Retry: - - ErrorEquals: [ "States.ALL" ] + - ErrorEquals: ['States.ALL'] End: true parallelHelper: Type: Parallel End: true Catch: - - ErrorEquals: [ "States.ALL" ] + - ErrorEquals: ['States.ALL'] Next: catchAllUpdateStatusToFailed + ResultPath: '$.error' Branches: - StartAt: startExportJob States: updateStatusToFailed: Type: Task Resource: !GetAtt updateStatus.Arn - Parameters: {"globalParams.$":"$", "status": "failed"} + Parameters: { 'globalParams.$': '$', 'status': 'failed' } Retry: - - ErrorEquals: [ "States.ALL" ] + - ErrorEquals: ['States.ALL'] End: true updateStatusToCanceled: Type: Task Resource: !GetAtt updateStatus.Arn - Parameters: {"globalParams.$":"$", "status": "canceled"} + Parameters: { 'globalParams.$': '$', 'status': 'canceled' } Retry: - - ErrorEquals: [ "States.ALL" ] + - ErrorEquals: ['States.ALL'] End: true updateStatusToCompleted: Type: Task Resource: !GetAtt updateStatus.Arn - Parameters: {"globalParams.$":"$", "status": "completed"} + Parameters: { 'globalParams.$': '$', 'status': 'completed' } Retry: - - ErrorEquals: [ "States.ALL" ] + - ErrorEquals: ['States.ALL'] End: true startExportJob: Type: Task Resource: !GetAtt startExportJob.Arn Retry: - - ErrorEquals: [ "States.ALL" ] + - ErrorEquals: ['States.ALL'] Next: waitForExportJob waitForExportJob: Type: Wait @@ -59,37 +60,37 @@ definition: Type: Task Resource: !GetAtt getJobStatus.Arn Retry: - - ErrorEquals: [ "States.ALL" ] + - ErrorEquals: ['States.ALL'] Next: choiceOnJobStatus choiceOnJobStatus: Type: Choice Choices: - - Variable: "$.executionParameters.isCanceled" + - Variable: '$.executionParameters.isCanceled' BooleanEquals: true Next: stopExportJob - - Variable: "$.executionParameters.glueJobRunStatus" + - Variable: '$.executionParameters.glueJobRunStatus' StringEquals: 'SUCCEEDED' Next: updateStatusToCompleted - Or: - - Variable: "$.executionParameters.glueJobRunStatus" - StringEquals: 'STARTING' - - Variable: "$.executionParameters.glueJobRunStatus" - StringEquals: 'RUNNING' + - Variable: '$.executionParameters.glueJobRunStatus' + StringEquals: 'STARTING' + - Variable: '$.executionParameters.glueJobRunStatus' + StringEquals: 'RUNNING' Next: waitForExportJob - Or: - - Variable: "$.executionParameters.glueJobRunStatus" - StringEquals: 'FAILED' - - Variable: "$.executionParameters.glueJobRunStatus" - StringEquals: 'TIMEOUT' - - Variable: "$.executionParameters.glueJobRunStatus" - # STOPPING and STOPPED can only occur here if the job was forcefully stopped with a Glue API call from outside the FHIR server, so we treat it as failure - StringEquals: 'STOPPING' - - Variable: "$.executionParameters.glueJobRunStatus" - StringEquals: 'STOPPED' + - Variable: '$.executionParameters.glueJobRunStatus' + StringEquals: 'FAILED' + - Variable: '$.executionParameters.glueJobRunStatus' + StringEquals: 'TIMEOUT' + - Variable: '$.executionParameters.glueJobRunStatus' + # STOPPING and STOPPED can only occur here if the job was forcefully stopped with a Glue API call from outside the FHIR server, so we treat it as failure + StringEquals: 'STOPPING' + - Variable: '$.executionParameters.glueJobRunStatus' + StringEquals: 'STOPPED' Next: updateStatusToFailed stopExportJob: Type: Task Resource: !GetAtt stopExportJob.Arn Retry: - - ErrorEquals: [ "States.ALL" ] + - ErrorEquals: ['States.ALL'] Next: updateStatusToCanceled diff --git a/integration-tests/bulkExport.test.ts b/integration-tests/bulkExport.test.ts index c301acb7..1dc50b93 100644 --- a/integration-tests/bulkExport.test.ts +++ b/integration-tests/bulkExport.test.ts @@ -109,6 +109,37 @@ describe('Bulk Export', () => { ); }); + test('Successfully export group members last updated after _since timestamp in a group last updated before the _since timestamp', async () => { + // BUILD + const createdResourceBundleResponse = await bulkExportTestHelper.sendCreateGroupRequest(); + const resTypToResExpectedInExport = bulkExportTestHelper.getResources( + createdResourceBundleResponse, + createGroupMembersBundle, + true, + ); + // sleep 10 seconds to make tests more resilient to clock skew when running locally. + await sleep(10_000); + const currentTime = new Date(); + // Update patient resource so the last updated time is after currentTime + const updatedPatientResource = await bulkExportTestHelper.updateResource( + resTypToResExpectedInExport.Patient, + ); + + // OPERATE + const groupId = resTypToResExpectedInExport.Group.id; + const statusPollUrl = await bulkExportTestHelper.startExportJob({ + exportType: 'group', + groupId, + since: currentTime, + }); + const responseBody = await bulkExportTestHelper.getExportStatus(statusPollUrl); + + // CHECK + return bulkExportTestHelper.checkResourceInExportedFiles(responseBody.output, { + Patient: updatedPatientResource, + }); + }); + test('Does not include inactive members in group export', async () => { // BUILD const createdResourceBundleResponse = await bulkExportTestHelper.sendCreateGroupRequest({ inactive: true }); diff --git a/integration-tests/bulkExportTestHelper.ts b/integration-tests/bulkExportTestHelper.ts index 394f72d0..b77c6f68 100644 --- a/integration-tests/bulkExportTestHelper.ts +++ b/integration-tests/bulkExportTestHelper.ts @@ -149,6 +149,13 @@ export default class BulkExportTestHelper { } } + async updateResource(resource: any) { + const resourceToUpdate = cloneDeep(resource); + delete resourceToUpdate.meta; + const response = await this.fhirUserAxios.put(`/${resource.resourceType}/${resource.id}`, resourceToUpdate); + return response.data; + } + // This method does not require FHIR user credentials in the header because the url is an S3 presigned URL static async downloadFile(url: string): Promise { try { @@ -189,10 +196,8 @@ export default class BulkExportTestHelper { if (swapBundleInternalReference) { let resourcesString = JSON.stringify(resources); urlToReferenceList.forEach(item => { - resourcesString = resourcesString.replace( - `"reference":"${item.url}"`, - `"reference":"${item.reference}"`, - ); + const regEx = new RegExp(`"reference":"${item.url}"`, 'g'); + resourcesString = resourcesString.replace(regEx, `"reference":"${item.reference}"`); }); resources = JSON.parse(resourcesString); } diff --git a/integration-tests/createGroupMembersBundle.json b/integration-tests/createGroupMembersBundle.json index 619f7a57..3e54253e 100644 --- a/integration-tests/createGroupMembersBundle.json +++ b/integration-tests/createGroupMembersBundle.json @@ -83,6 +83,33 @@ "method": "POST", "url": "Encounter" } + }, + { + "fullUrl": "urn:uuid:6ad9a6b5-44fb-4eae-9544-a36d5c05c780", + "resource": { + "resourceType": "Provenance", + "target": [ + { + "reference": "Procedure/example/_history/1" + }, + { + "reference": "urn:uuid:fcfe413c-c62d-4097-9e31-02ff6ff523ad" + } + ], + "recorded": "2015-06-27T08:39:24+10:00", + "agent": [ + { + "who": { + "reference": "Practitioner/xcda-author" + } + } + ] + }, + "request": + { + "method": "POST", + "url": "Provenance" + } } ] } \ No newline at end of file