Skip to content
This repository has been archived by the owner on Apr 13, 2023. It is now read-only.

fix: group export #460

Merged
merged 4 commits into from
Sep 28, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix: group export _since
Bingjiling authored and zheyanyu committed Sep 27, 2021
commit cf78358cc87989c10fae2c11146bd6ed9e6e1a9e
29 changes: 14 additions & 15 deletions bulkExport/glueScripts/export-script.py
Original file line number Diff line number Diff line change
@@ -89,17 +89,6 @@ def remove_composite_id(resource):

filtered_tenant_id_frame = Map.apply(frame = filtered_tenant_id_frame_with_composite_id, f = remove_composite_id)

print('Start filtering by transactionTime and Since')
# Filter by transactionTime and Since
datetime_since = datetime.strptime(since, "%Y-%m-%dT%H:%M:%S.%fZ")
datetime_transaction_time = datetime.strptime(transaction_time, "%Y-%m-%dT%H:%M:%S.%fZ")

filtered_dates_dyn_frame = Filter.apply(frame = filtered_tenant_id_frame,
f = lambda x:
datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") > datetime_since and
datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") <= datetime_transaction_time
)

print ('start filtering by group_id')
def is_active_group_member(member, datetime_transaction_time):
if getattr(member, 'inactive', None) == True:
@@ -140,8 +129,10 @@ def is_included_in_group_export(resource, group_member_ids, group_patient_ids, c
return True
return False

datetime_transaction_time = datetime.strptime(transaction_time, "%Y-%m-%dT%H:%M:%S.%fZ")

if (group_id is None):
filtered_group_frame = filtered_dates_dyn_frame
filtered_group_frame = filtered_tenant_id_frame
else:
print('Loading patient compartment search params')
client = boto3.client('s3')
@@ -150,22 +141,30 @@ def is_included_in_group_export(resource, group_member_ids, group_patient_ids, c
compartment_search_params = json.load(s3Obj['Body'])

print('Extract group member ids')
group_members = Filter.apply(frame = filtered_dates_dyn_frame, f = lambda x: x['id'] == group_id).toDF().collect()[0]['member']
group_members = Filter.apply(frame = filtered_tenant_id_frame, f = lambda x: x['id'] == group_id).toDF().collect()[0]['member']
active_group_member_references = [x['entity']['reference'] for x in group_members if is_active_group_member(x, datetime_transaction_time) and is_internal_reference(x['entity']['reference'], server_url)]
group_member_ids = set([x.split('/')[-1] for x in active_group_member_references])
group_patient_ids = set([x.split('/')[-1] for x in active_group_member_references if x.split('/')[-2] == 'Patient'])
print(group_member_ids)
print(group_patient_ids)

print('Extract group member and patient compartment dataframe')
filtered_group_frame = Filter.apply(frame = filtered_dates_dyn_frame, f = lambda x: is_included_in_group_export(x, group_member_ids, group_patient_ids, compartment_search_params, server_url))
filtered_group_frame = Filter.apply(frame = filtered_tenant_id_frame, f = lambda x: is_included_in_group_export(x, group_member_ids, group_patient_ids, compartment_search_params, server_url))

print('Start filtering by transactionTime and Since')
# Filter by transactionTime and Since
datetime_since = datetime.strptime(since, "%Y-%m-%dT%H:%M:%S.%fZ")
filtered_dates_dyn_frame = Filter.apply(frame = filtered_group_frame,
f = lambda x:
datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") > datetime_since and
datetime.strptime(x["meta"]["lastUpdated"], "%Y-%m-%dT%H:%M:%S.%fZ") <= datetime_transaction_time
)

print('Start filtering by documentStatus and resourceType')
# Filter by resource listed in Type and with correct STATUS
type_list = None if type == None else set(type.split(','))
valid_document_state_to_be_read_from = {'AVAILABLE','LOCKED', 'PENDING_DELETE'}
filtered_dates_resource_dyn_frame = Filter.apply(frame = filtered_group_frame,
filtered_dates_resource_dyn_frame = Filter.apply(frame = filtered_dates_dyn_frame,
f = lambda x:
x["documentStatus"] in valid_document_state_to_be_read_from if type_list is None
else x["documentStatus"] in valid_document_state_to_be_read_from and x["resourceType"] in type_list
7 changes: 7 additions & 0 deletions integration-tests/bulkExportTestHelper.ts
Original file line number Diff line number Diff line change
@@ -149,6 +149,13 @@ export default class BulkExportTestHelper {
}
}

async updateResource(resource: any) {
const resourceToUpdate = cloneDeep(resource);
delete resourceToUpdate.meta;
const response = await this.fhirUserAxios.put(`/${resource.resourceType}/${resource.id}`, resourceToUpdate);
return response.data;
}

// This method does not require FHIR user credentials in the header because the url is an S3 presigned URL
static async downloadFile(url: string): Promise<any[]> {
try {