Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support fetching resources with same parent table #144 #146

Merged
merged 7 commits into from
Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions batch/src/main/java/org/openmrs/analytics/FhirEtl.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

package org.openmrs.analytics;

import java.beans.PropertyVetoException;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -135,9 +138,8 @@ private static void fetchSegments(PCollection<SearchSegmentDescriptor> inputSegm
windowedRecords.apply(FileIO.<GenericRecord> write().via(sink).to(outputFile).withSuffix(".parquet")
.withNumShards(options.getNumFileShards()));
// TODO add Avro output option
// apply("WriteToAvro",
// AvroIO.writeGenericRecords(schema).to(outputFile).withSuffix(".avro")
// .withNumShards(options.getNumParquetShards()));
// apply("WriteToAvro", AvroIO.writeGenericRecords(schema).to(outputFile).withSuffix(".avro")
// .withNumShards(options.getNumParquetShards()));
}
if (!options.getOutputJsonPath().isEmpty()) {
PCollection<String> windowedRecords = addWindow(records.get(fetchSearchPageFn.jsonTag),
Expand All @@ -163,7 +165,8 @@ static void runFhirFetch(FhirEtlOptions options, FhirContext fhirContext) throws
EtlUtils.logMetrics(result.metrics());
}

static void runFhirJdbcFetch(FhirEtlOptions options, FhirContext fhirContext) throws Exception {
static void runFhirJdbcFetch(FhirEtlOptions options, FhirContext fhirContext)
throws PropertyVetoException, IOException, SQLException {
Pipeline pipeline = Pipeline.create(options);
JdbcConnectionUtil jdbcConnectionUtil = new JdbcConnectionUtil(options.getJdbcDriverClass(), options.getJdbcUrl(),
options.getDbUser(), options.getDbPassword(), options.getJdbcMaxPoolSize(),
Expand All @@ -172,12 +175,10 @@ static void runFhirJdbcFetch(FhirEtlOptions options, FhirContext fhirContext) th
JdbcIO.DataSourceConfiguration jdbcConfig = jdbcUtil.getJdbcConfig();
int batchSize = Math.min(options.getBatchSize(), 170); // batch size > 200 will result in HTTP 400 Bad Request
int jdbcFetchSize = options.getJdbcFetchSize();
Map<String, String> reverseMap = jdbcUtil.createFhirReverseMap(options.getSearchList(),
Map<String, ArrayList<String>> reverseMap = jdbcUtil.createFhirReverseMap(options.getSearchList(),
options.getTableFhirMapPath());
// process each table-resource mappings
Map<String, ArrayList<String>> deduplicatedReverseMap = jdbcUtil.deduplicateReverseMap(reverseMap);
for (Map.Entry<String, ArrayList<String>> entry : deduplicatedReverseMap.entrySet()) {

for (Map.Entry<String, ArrayList<String>> entry : reverseMap.entrySet()) {
String tableName = entry.getKey();
int maxId = jdbcUtil.fetchMaxId(tableName);
Map<Integer, Integer> IdRanges = jdbcUtil.createIdRanges(maxId, jdbcFetchSize);
Expand All @@ -194,7 +195,8 @@ static void runFhirJdbcFetch(FhirEtlOptions options, FhirContext fhirContext) th
EtlUtils.logMetrics(result.metrics());
}

public static void main(String[] args) throws Exception {
public static void main(String[] args)
throws CannotProvideCoderException, PropertyVetoException, IOException, SQLException {
// Todo: Autowire
FhirContext fhirContext = FhirContext.forR4();

Expand Down
30 changes: 14 additions & 16 deletions batch/src/main/java/org/openmrs/analytics/JdbcFetchUtil.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -13,6 +14,7 @@
// limitations under the License.
package org.openmrs.analytics;

import java.io.IOException;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand All @@ -26,7 +28,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.google.gson.Gson;
import org.apache.beam.sdk.coders.NullableCoder;
Expand Down Expand Up @@ -167,40 +168,37 @@ public Map<Integer, Integer> createIdRanges(int count, int rangeSize) {
return rangeMap;
}

public Map<String, String> createFhirReverseMap(String searchString, String tableFhirMapPath) throws Exception {
public Map<String, ArrayList<String>> createFhirReverseMap(String searchString, String tableFhirMapPath)
throws IOException {
Gson gson = new Gson();
Path pathToFile = Paths.get(tableFhirMapPath);
try (Reader reader = Files.newBufferedReader(pathToFile.toAbsolutePath(), StandardCharsets.UTF_8)) {
GeneralConfiguration generalConfiguration = gson.fromJson(reader, GeneralConfiguration.class);
Map<String, EventConfiguration> tableToFhirMap = generalConfiguration.getEventConfigurations();
String[] searchList = searchString.split(",");
Map<String, String> reverseMap = new HashMap<String, String>();
Map<String, ArrayList<String>> reverseMap = new HashMap<String, ArrayList<String>>();
for (Map.Entry<String, EventConfiguration> entry : tableToFhirMap.entrySet()) {
Map<String, String> linkTemplate = entry.getValue().getLinkTemplates();
for (String search : searchList) {
if (linkTemplate.containsKey("fhir") && linkTemplate.get("fhir") != null) {
String[] resourceName = linkTemplate.get("fhir").split("/");
ArrayList<String> resources = new ArrayList<String>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this to line 191 (i.e., the else section).

if (resourceName.length >= 1 && resourceName[1].equals(search)) {
if (!reverseMap.containsKey(resourceName[1])) {
reverseMap.put(resourceName[1], entry.getValue().getParentTable());
if (reverseMap.containsKey(entry.getValue().getParentTable())) {
resources = reverseMap.get(entry.getValue().getParentTable());
resources.add(resourceName[1]);
} else {
log.error("Some tables are mapped to the same Resources");
throw new Exception("config file has duplicate Resource mappings");
resources.add(resourceName[1]);
}
reverseMap.put(entry.getValue().getParentTable(), resources);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move this to line 191 as well (i.e., the else section). In the other case that the map already contains the parent table name, we don't need to create a new list and/or add it to the map. We just need to update the current list in the map.

}
}
}
}
if (reverseMap.size() < searchList.length) {
log.error("Some of the passed FHIR resources are not mapped to any table, please check the config");
}
return reverseMap;
}
}

public Map<String, ArrayList<String>> deduplicateReverseMap(Map<String, String> reverseMap) {
Map<String, ArrayList<String>> deduplicatedReverseMap = new HashMap<>(
reverseMap.entrySet().stream().collect(Collectors.groupingBy(Map.Entry::getValue)).values().stream()
.collect(Collectors.toMap(item -> item.get(0).getValue(),
item -> new ArrayList<>(item.stream().map(Map.Entry::getKey).collect(Collectors.toList())))));
return deduplicatedReverseMap;
}

}
34 changes: 10 additions & 24 deletions batch/src/test/java/org/openmrs/analytics/JdbcFetchUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,30 +132,16 @@ public void testCreateSearchSegmentDescriptor() {

@Test
public void testCreateFhirReverseMap() throws Exception {
Map<String, String> reverseMap = jdbcFetchUtil.createFhirReverseMap("Patient,Person,Encounter,Observation",
"../utils/dbz_event_to_fhir_config.json");
assertEquals(4, reverseMap.size());
assertEquals(reverseMap.get("Patient"), "person");
assertEquals(reverseMap.get("Person"), "person");
assertEquals(reverseMap.get("Encounter"), "encounter");
assertEquals(reverseMap.get("Observation"), "obs");
Map<String, ArrayList<String>> reverseMap = jdbcFetchUtil
.createFhirReverseMap("Patient,Person,Encounter,Observation", "../utils/dbz_event_to_fhir_config.json");
System.out.println(reverseMap);

Map<String, ArrayList<String>> deduplicatedRerveseMap = jdbcFetchUtil.deduplicateReverseMap(reverseMap);

assertEquals(3, deduplicatedRerveseMap.size());
assertEquals(deduplicatedRerveseMap.size(), 3);
assertEquals(deduplicatedRerveseMap.get("person").size(), 2);
assertTrue(deduplicatedRerveseMap.get("person").contains("Patient"));
assertTrue(deduplicatedRerveseMap.get("person").contains("Person"));
assertTrue(deduplicatedRerveseMap.get("encounter").contains("Encounter"));
assertTrue(deduplicatedRerveseMap.get("obs").contains("Observation"));
}

@Test(expected = Exception.class)
public void createFhirReverseMapShouldThrowErrorOnDuplicateResourceMappings() throws Exception {
Map<String, String> reverseMap = jdbcFetchUtil.createFhirReverseMap("Patient,Person",
"src/test/resources/duplicate_dbz_event_to_fhir_config.json");
assertEquals(4, reverseMap.size());
assertEquals(reverseMap.size(), 4);
assertEquals(reverseMap.get("person").size(), 2);
assertTrue(reverseMap.get("person").contains("Patient"));
assertTrue(reverseMap.get("person").contains("Person"));
assertTrue(reverseMap.get("encounter").contains("Encounter"));
assertTrue(reverseMap.get("visit").contains("Encounter"));
assertTrue(reverseMap.get("obs").contains("Observation"));
}

}
43 changes: 0 additions & 43 deletions batch/src/test/resources/duplicate_dbz_event_to_fhir_config.json

This file was deleted.

10 changes: 1 addition & 9 deletions common/src/main/java/org/openmrs/analytics/FhirStoreUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import ca.uhn.fhir.rest.client.api.IRestfulClientFactory;
import ca.uhn.fhir.rest.client.interceptor.AdditionalRequestHeadersInterceptor;
import ca.uhn.fhir.rest.client.interceptor.BasicAuthInterceptor;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseOperationOutcome;
import org.hl7.fhir.r4.model.Bundle;
Expand Down Expand Up @@ -92,14 +91,7 @@ public Collection<MethodOutcome> uploadBundle(Bundle bundle) {
interceptors = Collections.singleton(new BasicAuthInterceptor(sinkUsername, sinkPassword));
}

Collection<MethodOutcome> responses;
try {
responses = uploadBundle(sinkUrl, bundle, interceptors);
}
catch (ResourceVersionConflictException e) {
responses = uploadBundle(sinkUrl, bundle, interceptors);
}
return responses;
return uploadBundle(sinkUrl, bundle, interceptors);
}

protected IGenericClient createGenericClient(String sinkUrl, Collection<IClientInterceptor> interceptors) {
Expand Down
5 changes: 3 additions & 2 deletions utils/dbz_event_to_fhir_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@
"title": "Visit",
"parentTable": "visit",
"linkTemplates": {
"rest": "/ws/rest/v1/visit/{uuid}?v=full"
"rest": "/ws/rest/v1/visit/{uuid}?v=full",
"fhir": "/Encounter/{uuid}"
}
},
"program":{
Expand Down Expand Up @@ -242,4 +243,4 @@
}
}
}
}
}