Skip to content
This repository has been archived by the owner on Apr 13, 2023. It is now read-only.

Commit

Permalink
fix: group export (#460)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bingjiling authored Sep 28, 2021
1 parent 1246252 commit 4d86104
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 55 deletions.
51 changes: 28 additions & 23 deletions bulkExport/glueScripts/export-script.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,6 @@ def remove_composite_id(resource):

filtered_tenant_id_frame = Map.apply(frame = filtered_tenant_id_frame_with_composite_id, f = remove_composite_id)

print('Start filtering by transactionTime and Since')
# Filter by transactionTime and Since
datetime_since = datetime.strptime(since, "%Y-%m-%dT%H:%M:%S.%fZ")
datetime_transaction_time = datetime.strptime(transaction_time, "%Y-%m-%dT%H:%M:%S.%fZ")

filtered_dates_dyn_frame = Filter.apply(frame = filtered_tenant_id_frame,
f = lambda x:
datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") > datetime_since and
datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") <= datetime_transaction_time
)

print ('start filtering by group_id')
def is_active_group_member(member, datetime_transaction_time):
if getattr(member, 'inactive', None) == True:
Expand All @@ -120,11 +109,12 @@ def is_internal_reference(reference, server_url):
return False

def deep_get(resource, path):
if resource is None:
return None
if len(path) is 1:
return resource[path[0]]['reference']
return deep_get(resource[path[0]], path.pop(0))
temp = resource
for p in path:
if temp is None:
return None
temp = temp[p]
return temp

def is_included_in_group_export(resource, group_member_ids, group_patient_ids, compartment_search_params, server_url):
# Check if resource is part of the group
Expand All @@ -133,15 +123,22 @@ def is_included_in_group_export(resource, group_member_ids, group_patient_ids, c
# Check if resource is part of the patient compartment
if resource['resourceType'] in compartment_search_params:
# Get inclusion criteria paths for the resource
inclusion_paths = compartment_search_params[resource.resourceType]
inclusion_paths = compartment_search_params[resource['resourceType']]
for path in inclusion_paths:
reference = deep_get(resource, path.split("."))
if is_internal_reference(reference, server_url) and reference.split('/')[-1] in group_patient_ids:
return True
if isinstance(reference, dict):
reference = [reference]
elif not isinstance(reference, list):
return False # Inclusion criteria should point to a dict {reference: 'Patient/1234'} or a list of references
for ref in reference:
if is_internal_reference(ref['reference'], server_url) and ref['reference'].split('/')[-1] in group_patient_ids:
return True
return False

datetime_transaction_time = datetime.strptime(transaction_time, "%Y-%m-%dT%H:%M:%S.%fZ")

if (group_id is None):
filtered_group_frame = filtered_dates_dyn_frame
filtered_group_frame = filtered_tenant_id_frame
else:
print('Loading patient compartment search params')
client = boto3.client('s3')
Expand All @@ -150,22 +147,30 @@ def is_included_in_group_export(resource, group_member_ids, group_patient_ids, c
compartment_search_params = json.load(s3Obj['Body'])

print('Extract group member ids')
group_members = Filter.apply(frame = filtered_dates_dyn_frame, f = lambda x: x['id'] == group_id).toDF().collect()[0]['member']
group_members = Filter.apply(frame = filtered_tenant_id_frame, f = lambda x: x['id'] == group_id).toDF().sort("meta.versionId").collect()[-1]['member']
active_group_member_references = [x['entity']['reference'] for x in group_members if is_active_group_member(x, datetime_transaction_time) and is_internal_reference(x['entity']['reference'], server_url)]
group_member_ids = set([x.split('/')[-1] for x in active_group_member_references])
group_patient_ids = set([x.split('/')[-1] for x in active_group_member_references if x.split('/')[-2] == 'Patient'])
print(group_member_ids)
print(group_patient_ids)

print('Extract group member and patient compartment dataframe')
filtered_group_frame = Filter.apply(frame = filtered_dates_dyn_frame, f = lambda x: is_included_in_group_export(x, group_member_ids, group_patient_ids, compartment_search_params, server_url))
filtered_group_frame = Filter.apply(frame = filtered_tenant_id_frame, f = lambda x: is_included_in_group_export(x, group_member_ids, group_patient_ids, compartment_search_params, server_url))

print('Start filtering by transactionTime and Since')
# Filter by transactionTime and Since
datetime_since = datetime.strptime(since, "%Y-%m-%dT%H:%M:%S.%fZ")
filtered_dates_dyn_frame = Filter.apply(frame = filtered_group_frame,
f = lambda x:
datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") > datetime_since and
datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") <= datetime_transaction_time
)

print('Start filtering by documentStatus and resourceType')
# Filter by resource listed in Type and with correct STATUS
type_list = None if type == None else set(type.split(','))
valid_document_state_to_be_read_from = {'AVAILABLE','LOCKED', 'PENDING_DELETE'}
filtered_dates_resource_dyn_frame = Filter.apply(frame = filtered_group_frame,
filtered_dates_resource_dyn_frame = Filter.apply(frame = filtered_dates_dyn_frame,
f = lambda x:
x["documentStatus"] in valid_document_state_to_be_read_from if type_list is None
else x["documentStatus"] in valid_document_state_to_be_read_from and x["resourceType"] in type_list
Expand Down
57 changes: 29 additions & 28 deletions bulkExport/state-machine-definition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,52 @@

id: BulkExportStateMachine
definition:
Comment: "State machine that executes a FHIR bulk export job"
Comment: 'State machine that executes a FHIR bulk export job'
StartAt: parallelHelper
States:
catchAllUpdateStatusToFailed:
Type: Task
Resource: !GetAtt updateStatus.Arn
Parameters: {"jobId.$":"$.jobId", "status": "failed"}
Parameters: { 'globalParams.$': '$', 'status': 'failed' }
Retry:
- ErrorEquals: [ "States.ALL" ]
- ErrorEquals: ['States.ALL']
End: true
parallelHelper:
Type: Parallel
End: true
Catch:
- ErrorEquals: [ "States.ALL" ]
- ErrorEquals: ['States.ALL']
Next: catchAllUpdateStatusToFailed
ResultPath: '$.error'
Branches:
- StartAt: startExportJob
States:
updateStatusToFailed:
Type: Task
Resource: !GetAtt updateStatus.Arn
Parameters: {"globalParams.$":"$", "status": "failed"}
Parameters: { 'globalParams.$': '$', 'status': 'failed' }
Retry:
- ErrorEquals: [ "States.ALL" ]
- ErrorEquals: ['States.ALL']
End: true
updateStatusToCanceled:
Type: Task
Resource: !GetAtt updateStatus.Arn
Parameters: {"globalParams.$":"$", "status": "canceled"}
Parameters: { 'globalParams.$': '$', 'status': 'canceled' }
Retry:
- ErrorEquals: [ "States.ALL" ]
- ErrorEquals: ['States.ALL']
End: true
updateStatusToCompleted:
Type: Task
Resource: !GetAtt updateStatus.Arn
Parameters: {"globalParams.$":"$", "status": "completed"}
Parameters: { 'globalParams.$': '$', 'status': 'completed' }
Retry:
- ErrorEquals: [ "States.ALL" ]
- ErrorEquals: ['States.ALL']
End: true
startExportJob:
Type: Task
Resource: !GetAtt startExportJob.Arn
Retry:
- ErrorEquals: [ "States.ALL" ]
- ErrorEquals: ['States.ALL']
Next: waitForExportJob
waitForExportJob:
Type: Wait
Expand All @@ -59,37 +60,37 @@ definition:
Type: Task
Resource: !GetAtt getJobStatus.Arn
Retry:
- ErrorEquals: [ "States.ALL" ]
- ErrorEquals: ['States.ALL']
Next: choiceOnJobStatus
choiceOnJobStatus:
Type: Choice
Choices:
- Variable: "$.executionParameters.isCanceled"
- Variable: '$.executionParameters.isCanceled'
BooleanEquals: true
Next: stopExportJob
- Variable: "$.executionParameters.glueJobRunStatus"
- Variable: '$.executionParameters.glueJobRunStatus'
StringEquals: 'SUCCEEDED'
Next: updateStatusToCompleted
- Or:
- Variable: "$.executionParameters.glueJobRunStatus"
StringEquals: 'STARTING'
- Variable: "$.executionParameters.glueJobRunStatus"
StringEquals: 'RUNNING'
- Variable: '$.executionParameters.glueJobRunStatus'
StringEquals: 'STARTING'
- Variable: '$.executionParameters.glueJobRunStatus'
StringEquals: 'RUNNING'
Next: waitForExportJob
- Or:
- Variable: "$.executionParameters.glueJobRunStatus"
StringEquals: 'FAILED'
- Variable: "$.executionParameters.glueJobRunStatus"
StringEquals: 'TIMEOUT'
- Variable: "$.executionParameters.glueJobRunStatus"
# STOPPING and STOPPED can only occur here if the job was forcefully stopped with a Glue API call from outside the FHIR server, so we treat it as failure
StringEquals: 'STOPPING'
- Variable: "$.executionParameters.glueJobRunStatus"
StringEquals: 'STOPPED'
- Variable: '$.executionParameters.glueJobRunStatus'
StringEquals: 'FAILED'
- Variable: '$.executionParameters.glueJobRunStatus'
StringEquals: 'TIMEOUT'
- Variable: '$.executionParameters.glueJobRunStatus'
# STOPPING and STOPPED can only occur here if the job was forcefully stopped with a Glue API call from outside the FHIR server, so we treat it as failure
StringEquals: 'STOPPING'
- Variable: '$.executionParameters.glueJobRunStatus'
StringEquals: 'STOPPED'
Next: updateStatusToFailed
stopExportJob:
Type: Task
Resource: !GetAtt stopExportJob.Arn
Retry:
- ErrorEquals: [ "States.ALL" ]
- ErrorEquals: ['States.ALL']
Next: updateStatusToCanceled
31 changes: 31 additions & 0 deletions integration-tests/bulkExport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,37 @@ describe('Bulk Export', () => {
);
});

test('Successfully export group members last updated after _since timestamp in a group last updated before the _since timestamp', async () => {
// BUILD
const createdResourceBundleResponse = await bulkExportTestHelper.sendCreateGroupRequest();
const resTypToResExpectedInExport = bulkExportTestHelper.getResources(
createdResourceBundleResponse,
createGroupMembersBundle,
true,
);
// sleep 10 seconds to make tests more resilient to clock skew when running locally.
await sleep(10_000);
const currentTime = new Date();
// Update patient resource so the last updated time is after currentTime
const updatedPatientResource = await bulkExportTestHelper.updateResource(
resTypToResExpectedInExport.Patient,
);

// OPERATE
const groupId = resTypToResExpectedInExport.Group.id;
const statusPollUrl = await bulkExportTestHelper.startExportJob({
exportType: 'group',
groupId,
since: currentTime,
});
const responseBody = await bulkExportTestHelper.getExportStatus(statusPollUrl);

// CHECK
return bulkExportTestHelper.checkResourceInExportedFiles(responseBody.output, {
Patient: updatedPatientResource,
});
});

test('Does not include inactive members in group export', async () => {
// BUILD
const createdResourceBundleResponse = await bulkExportTestHelper.sendCreateGroupRequest({ inactive: true });
Expand Down
13 changes: 9 additions & 4 deletions integration-tests/bulkExportTestHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ export default class BulkExportTestHelper {
}
}

async updateResource(resource: any) {
const resourceToUpdate = cloneDeep(resource);
delete resourceToUpdate.meta;
const response = await this.fhirUserAxios.put(`/${resource.resourceType}/${resource.id}`, resourceToUpdate);
return response.data;
}

// This method does not require FHIR user credentials in the header because the url is an S3 presigned URL
static async downloadFile(url: string): Promise<any[]> {
try {
Expand Down Expand Up @@ -189,10 +196,8 @@ export default class BulkExportTestHelper {
if (swapBundleInternalReference) {
let resourcesString = JSON.stringify(resources);
urlToReferenceList.forEach(item => {
resourcesString = resourcesString.replace(
`"reference":"${item.url}"`,
`"reference":"${item.reference}"`,
);
const regEx = new RegExp(`"reference":"${item.url}"`, 'g');
resourcesString = resourcesString.replace(regEx, `"reference":"${item.reference}"`);
});
resources = JSON.parse(resourcesString);
}
Expand Down
27 changes: 27 additions & 0 deletions integration-tests/createGroupMembersBundle.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,33 @@
"method": "POST",
"url": "Encounter"
}
},
{
"fullUrl": "urn:uuid:6ad9a6b5-44fb-4eae-9544-a36d5c05c780",
"resource": {
"resourceType": "Provenance",
"target": [
{
"reference": "Procedure/example/_history/1"
},
{
"reference": "urn:uuid:fcfe413c-c62d-4097-9e31-02ff6ff523ad"
}
],
"recorded": "2015-06-27T08:39:24+10:00",
"agent": [
{
"who": {
"reference": "Practitioner/xcda-author"
}
}
]
},
"request":
{
"method": "POST",
"url": "Provenance"
}
}
]
}

0 comments on commit 4d86104

Please sign in to comment.