Skip to content

Commit

Permalink
fetchresources with the same parent table
Browse files Browse the repository at this point in the history
  • Loading branch information
mozzy11 committed Apr 9, 2021
1 parent 01a3bf2 commit 057e9f8
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 103 deletions.
20 changes: 11 additions & 9 deletions batch/src/main/java/org/openmrs/analytics/FhirEtl.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

package org.openmrs.analytics;

import java.beans.PropertyVetoException;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -135,9 +138,8 @@ private static void fetchSegments(PCollection<SearchSegmentDescriptor> inputSegm
windowedRecords.apply(FileIO.<GenericRecord> write().via(sink).to(outputFile).withSuffix(".parquet")
.withNumShards(options.getNumFileShards()));
// TODO add Avro output option
// apply("WriteToAvro",
// AvroIO.writeGenericRecords(schema).to(outputFile).withSuffix(".avro")
// .withNumShards(options.getNumParquetShards()));
// apply("WriteToAvro", AvroIO.writeGenericRecords(schema).to(outputFile).withSuffix(".avro")
// .withNumShards(options.getNumParquetShards()));
}
if (!options.getOutputJsonPath().isEmpty()) {
PCollection<String> windowedRecords = addWindow(records.get(fetchSearchPageFn.jsonTag),
Expand All @@ -163,7 +165,8 @@ static void runFhirFetch(FhirEtlOptions options, FhirContext fhirContext) throws
EtlUtils.logMetrics(result.metrics());
}

static void runFhirJdbcFetch(FhirEtlOptions options, FhirContext fhirContext) throws Exception {
static void runFhirJdbcFetch(FhirEtlOptions options, FhirContext fhirContext)
throws PropertyVetoException, IOException, SQLException {
Pipeline pipeline = Pipeline.create(options);
JdbcConnectionUtil jdbcConnectionUtil = new JdbcConnectionUtil(options.getJdbcDriverClass(), options.getJdbcUrl(),
options.getDbUser(), options.getDbPassword(), options.getJdbcMaxPoolSize(),
Expand All @@ -172,12 +175,10 @@ static void runFhirJdbcFetch(FhirEtlOptions options, FhirContext fhirContext) th
JdbcIO.DataSourceConfiguration jdbcConfig = jdbcUtil.getJdbcConfig();
int batchSize = Math.min(options.getBatchSize(), 170); // batch size > 200 will result in HTTP 400 Bad Request
int jdbcFetchSize = options.getJdbcFetchSize();
Map<String, String> reverseMap = jdbcUtil.createFhirReverseMap(options.getSearchList(),
Map<String, ArrayList<String>> reverseMap = jdbcUtil.createFhirReverseMap(options.getSearchList(),
options.getTableFhirMapPath());
// process each table-resource mappings
Map<String, ArrayList<String>> deduplicatedReverseMap = jdbcUtil.deduplicateReverseMap(reverseMap);
for (Map.Entry<String, ArrayList<String>> entry : deduplicatedReverseMap.entrySet()) {

for (Map.Entry<String, ArrayList<String>> entry : reverseMap.entrySet()) {
String tableName = entry.getKey();
int maxId = jdbcUtil.fetchMaxId(tableName);
Map<Integer, Integer> IdRanges = jdbcUtil.createIdRanges(maxId, jdbcFetchSize);
Expand All @@ -194,7 +195,8 @@ static void runFhirJdbcFetch(FhirEtlOptions options, FhirContext fhirContext) th
EtlUtils.logMetrics(result.metrics());
}

public static void main(String[] args) throws Exception {
public static void main(String[] args)
throws CannotProvideCoderException, PropertyVetoException, IOException, SQLException {
// Todo: Autowire
FhirContext fhirContext = FhirContext.forR4();

Expand Down
30 changes: 14 additions & 16 deletions batch/src/main/java/org/openmrs/analytics/JdbcFetchUtil.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -13,6 +14,7 @@
// limitations under the License.
package org.openmrs.analytics;

import java.io.IOException;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand All @@ -26,7 +28,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.google.gson.Gson;
import org.apache.beam.sdk.coders.NullableCoder;
Expand Down Expand Up @@ -167,40 +168,37 @@ public Map<Integer, Integer> createIdRanges(int count, int rangeSize) {
return rangeMap;
}

public Map<String, String> createFhirReverseMap(String searchString, String tableFhirMapPath) throws Exception {
public Map<String, ArrayList<String>> createFhirReverseMap(String searchString, String tableFhirMapPath)
throws IOException {
Gson gson = new Gson();
Path pathToFile = Paths.get(tableFhirMapPath);
try (Reader reader = Files.newBufferedReader(pathToFile.toAbsolutePath(), StandardCharsets.UTF_8)) {
GeneralConfiguration generalConfiguration = gson.fromJson(reader, GeneralConfiguration.class);
Map<String, EventConfiguration> tableToFhirMap = generalConfiguration.getEventConfigurations();
String[] searchList = searchString.split(",");
Map<String, String> reverseMap = new HashMap<String, String>();
Map<String, ArrayList<String>> reverseMap = new HashMap<String, ArrayList<String>>();
for (Map.Entry<String, EventConfiguration> entry : tableToFhirMap.entrySet()) {
Map<String, String> linkTemplate = entry.getValue().getLinkTemplates();
for (String search : searchList) {
if (linkTemplate.containsKey("fhir") && linkTemplate.get("fhir") != null) {
String[] resourceName = linkTemplate.get("fhir").split("/");
ArrayList<String> resources = new ArrayList<String>();
if (resourceName.length >= 1 && resourceName[1].equals(search)) {
if (!reverseMap.containsKey(resourceName[1])) {
reverseMap.put(resourceName[1], entry.getValue().getParentTable());
if (reverseMap.containsKey(entry.getValue().getParentTable())) {
resources = reverseMap.get(entry.getValue().getParentTable());
resources.add(resourceName[1]);
} else {
log.error("Some tables are mapped to the same Resources");
throw new Exception("config file has duplicate Resource mappings");
resources.add(resourceName[1]);
}
reverseMap.put(entry.getValue().getParentTable(), resources);
}
}
}
}
if (reverseMap.size() < searchList.length) {
log.error("Some of the passed FHIR resources are not mapped to any table, please check the config");
}
return reverseMap;
}
}

public Map<String, ArrayList<String>> deduplicateReverseMap(Map<String, String> reverseMap) {
Map<String, ArrayList<String>> deduplicatedReverseMap = new HashMap<>(
reverseMap.entrySet().stream().collect(Collectors.groupingBy(Map.Entry::getValue)).values().stream()
.collect(Collectors.toMap(item -> item.get(0).getValue(),
item -> new ArrayList<>(item.stream().map(Map.Entry::getKey).collect(Collectors.toList())))));
return deduplicatedReverseMap;
}

}
34 changes: 10 additions & 24 deletions batch/src/test/java/org/openmrs/analytics/JdbcFetchUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,30 +132,16 @@ public void testCreateSearchSegmentDescriptor() {

@Test
public void testCreateFhirReverseMap() throws Exception {
Map<String, String> reverseMap = jdbcFetchUtil.createFhirReverseMap("Patient,Person,Encounter,Observation",
"../utils/dbz_event_to_fhir_config.json");
assertEquals(4, reverseMap.size());
assertEquals(reverseMap.get("Patient"), "person");
assertEquals(reverseMap.get("Person"), "person");
assertEquals(reverseMap.get("Encounter"), "encounter");
assertEquals(reverseMap.get("Observation"), "obs");
Map<String, ArrayList<String>> reverseMap = jdbcFetchUtil
.createFhirReverseMap("Patient,Person,Encounter,Observation", "../utils/dbz_event_to_fhir_config.json");
System.out.println(reverseMap);

Map<String, ArrayList<String>> deduplicatedRerveseMap = jdbcFetchUtil.deduplicateReverseMap(reverseMap);

assertEquals(3, deduplicatedRerveseMap.size());
assertEquals(deduplicatedRerveseMap.size(), 3);
assertEquals(deduplicatedRerveseMap.get("person").size(), 2);
assertTrue(deduplicatedRerveseMap.get("person").contains("Patient"));
assertTrue(deduplicatedRerveseMap.get("person").contains("Person"));
assertTrue(deduplicatedRerveseMap.get("encounter").contains("Encounter"));
assertTrue(deduplicatedRerveseMap.get("obs").contains("Observation"));
}

@Test(expected = Exception.class)
public void createFhirReverseMapShouldThrowErrorOnDuplicateResourceMappings() throws Exception {
Map<String, String> reverseMap = jdbcFetchUtil.createFhirReverseMap("Patient,Person",
"src/test/resources/duplicate_dbz_event_to_fhir_config.json");
assertEquals(4, reverseMap.size());
assertEquals(reverseMap.size(), 4);
assertEquals(reverseMap.get("person").size(), 2);
assertTrue(reverseMap.get("person").contains("Patient"));
assertTrue(reverseMap.get("person").contains("Person"));
assertTrue(reverseMap.get("encounter").contains("Encounter"));
assertTrue(reverseMap.get("visit").contains("Encounter"));
assertTrue(reverseMap.get("obs").contains("Observation"));
}

}
43 changes: 0 additions & 43 deletions batch/src/test/resources/duplicate_dbz_event_to_fhir_config.json

This file was deleted.

10 changes: 1 addition & 9 deletions common/src/main/java/org/openmrs/analytics/FhirStoreUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import ca.uhn.fhir.rest.client.api.IRestfulClientFactory;
import ca.uhn.fhir.rest.client.interceptor.AdditionalRequestHeadersInterceptor;
import ca.uhn.fhir.rest.client.interceptor.BasicAuthInterceptor;
import ca.uhn.fhir.rest.server.exceptions.ResourceVersionConflictException;
import org.apache.commons.lang3.StringUtils;
import org.hl7.fhir.instance.model.api.IBaseOperationOutcome;
import org.hl7.fhir.r4.model.Bundle;
Expand Down Expand Up @@ -92,14 +91,7 @@ public Collection<MethodOutcome> uploadBundle(Bundle bundle) {
interceptors = Collections.singleton(new BasicAuthInterceptor(sinkUsername, sinkPassword));
}

Collection<MethodOutcome> responses;
try {
responses = uploadBundle(sinkUrl, bundle, interceptors);
}
catch (ResourceVersionConflictException e) {
responses = uploadBundle(sinkUrl, bundle, interceptors);
}
return responses;
return uploadBundle(sinkUrl, bundle, interceptors);
}

protected IGenericClient createGenericClient(String sinkUrl, Collection<IClientInterceptor> interceptors) {
Expand Down
5 changes: 3 additions & 2 deletions utils/dbz_event_to_fhir_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@
"title": "Visit",
"parentTable": "visit",
"linkTemplates": {
"rest": "/ws/rest/v1/visit/{uuid}?v=full"
"rest": "/ws/rest/v1/visit/{uuid}?v=full",
"fhir": "/Encounter/{uuid}"
}
},
"program":{
Expand Down Expand Up @@ -242,4 +243,4 @@
}
}
}
}
}

0 comments on commit 057e9f8

Please sign in to comment.