Skip to content

Commit

Permalink
Implementation of --activePeriod feature. (#152)
Browse files Browse the repository at this point in the history
First part of #128
  • Loading branch information
bashir2 authored Apr 14, 2021
1 parent a65f482 commit a540b5f
Show file tree
Hide file tree
Showing 17 changed files with 772 additions and 159 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ $ java -cp batch/target/fhir-batch-etl-bundled-0.1.0-SNAPSHOT.jar \
--fhirSinkPath=http://localhost:8098/fhir \
--sinkUserName=hapi --sinkPassword=hapi \
--outputParquetPath=/tmp/TEST/ \
--searchList=Patient,Encounter,Observation --batchSize=20
--resourceList=Patient,Encounter,Observation --batchSize=20
```

Parameters:

- `searchList` - A comma-separated list of
[FHIR Search](https://www.hl7.org/fhir/search.html) URLs. For example,
- `resourceList` - A comma-separated list of
[FHIR resources](https://www.hl7.org/fhir/resourcelist.html) URLs. For example,
`Patient?given=Susan` will extract only Patient resources that meet the
`given=Susan` criteria. Default: `Patient,Encounter,Observation`
- `batchSize` - The number of resources to fetch in each API call.
Expand All @@ -154,7 +154,7 @@ $ java -cp batch/target/fhir-batch-etl-bundled-0.1.0-SNAPSHOT.jar \
--fhirSinkPath=http://localhost:8098/fhir \
--sinkUserName=hapi --sinkPassword=hapi \
--outputParquetPath=/tmp/TEST/ \
--searchList=Patient,Encounter,Observation --batchSize=20 \
--resourceList=Patient,Encounter,Observation --batchSize=20 \
--jdbcModeEnabled=true --jdbcUrl=jdbc:mysql://localhost:3306/openmrs \
--dbUser=root --dbPassword=debezium --jdbcMaxPoolSize=50 \
--jdbcDriverClass=com.mysql.cj.jdbc.Driver
Expand All @@ -163,7 +163,7 @@ $ java -cp batch/target/fhir-batch-etl-bundled-0.1.0-SNAPSHOT.jar \

Parameters:

- `searchList` - A comma-separated list of FHIR Resources to be fetched from
- `resourceList` - A comma-separated list of FHIR Resources to be fetched from
OpenMRS. Default: `Patient,Encounter,Observation`
- `batchSize` - The number of resources to fetch in each API call. Default:
`100`
Expand Down
4 changes: 2 additions & 2 deletions batch/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ENV OPENMRS_PASSWORD="Admin123"
ENV SINK_PATH=""
ENV SINK_USERNAME=""
ENV SINK_PASSWORD=""
ENV SEARCH_LIST="Patient,Encounter,Observation"
ENV RESOURCE_LIST="Patient,Encounter,Observation"
ENV BATCH_SIZE=10
ENV TARGET_PARALLELISM=10
ENV PARQUET_PATH="/tmp/"
Expand All @@ -44,7 +44,7 @@ VOLUME [ "${WORK_DIR}" ]

ENTRYPOINT java -cp app.jar org.openmrs.analytics.FhirEtl \
--openmrsServerUrl=${OPENMRS_URL} --openmrsUserName=${OPENMRS_USERNAME} --openmrsPassword=${OPENMRS_PASSWORD} \
--searchList=${SEARCH_LIST} --batchSize=${BATCH_SIZE} --targetParallelism=${TARGET_PARALLELISM} \
--resourceList=${RESOURCE_LIST} --batchSize=${BATCH_SIZE} --targetParallelism=${TARGET_PARALLELISM} \
--fhirSinkPath=${SINK_PATH} --sinkUserName=${SINK_USERNAME} --sinkPassword=${SINK_PASSWORD} \
--outputParquetPath=${PARQUET_PATH} --jdbcModeEnabled=${JDBC_MODE_ENABLED} --jdbcDriverClass=${JDBC_DRIVER_CLASS} \
--dbUser=${DB_USER} --dbPassword=${DB_PASSWORD} --jdbcMaxPoolSize=${JDBC_MAX_POOL_SIZE} --jdbcUrl=${JDBC_URL} \
Expand Down
84 changes: 84 additions & 0 deletions batch/src/main/java/org/openmrs/analytics/FetchPatientHistory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.openmrs.analytics;

import java.util.List;

import ca.uhn.fhir.rest.api.SummaryEnum;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTagList;
import org.hl7.fhir.r4.model.Bundle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FetchPatientHistory extends PTransform<PCollection<KV<String, Integer>>, PCollectionTuple> {

private static final Logger log = LoggerFactory.getLogger(FetchPatientHistory.class);

private final FetchSearchPageFn<KV<String, Integer>> fetchSearchPageFn;

private final Schema schema;

FetchPatientHistory(FhirEtlOptions options, String resourceType, Schema schema) {
Preconditions.checkState(!options.getActivePeriod().isEmpty());
List<String> dateRange = FhirSearchUtil.getDateRange(options.getActivePeriod());

int count = options.getBatchSize();
this.schema = schema;

fetchSearchPageFn = new FetchSearchPageFn<KV<String, Integer>>(options, resourceType + "_history") {

@ProcessElement
public void ProcessElement(@Element KV<String, Integer> patientIdCount, MultiOutputReceiver out) {
String patientId = patientIdCount.getKey();
log.info(String.format("Fetching historical %s resources for patient %s", resourceType, patientId));
Bundle bundle = this.fhirSearchUtil.searchByPatientAndLastDate(resourceType, patientId, dateRange.get(0),
count);
processBundle(bundle, out);
String nextUrl = this.fhirSearchUtil.getNextUrl(bundle);
while (nextUrl != null) {
bundle = this.fhirSearchUtil.searchByUrl(nextUrl, count, SummaryEnum.DATA);
processBundle(bundle, out);
nextUrl = this.fhirSearchUtil.getNextUrl(bundle);
}
}
};
}

@Override
public PCollectionTuple expand(PCollection<KV<String, Integer>> patientIdCounts) {
PCollectionTuple records = patientIdCounts.apply(ParDo.of(fetchSearchPageFn)
.withOutputTags(fetchSearchPageFn.avroTag, TupleTagList.of(Lists.newArrayList(fetchSearchPageFn.jsonTag))));
records.get(fetchSearchPageFn.avroTag).setCoder(AvroCoder.of(GenericRecord.class, schema));
return records;
}

public PCollection<GenericRecord> getAvroRecords(PCollectionTuple records) {
return fetchSearchPageFn.getAvroRecords(records);
}

public PCollection<String> getJsonRecords(PCollectionTuple records) {
return fetchSearchPageFn.getJsonRecords(records);
}

}
80 changes: 80 additions & 0 deletions batch/src/main/java/org/openmrs/analytics/FetchPatients.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.openmrs.analytics;

import java.util.List;

import ca.uhn.fhir.rest.api.SummaryEnum;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTagList;
import org.hl7.fhir.r4.model.Bundle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FetchPatients extends PTransform<PCollection<KV<String, Integer>>, PCollectionTuple> {

private static final Logger log = LoggerFactory.getLogger(FetchPatientHistory.class);

private static final String PATIENT = "Patient";

private final FetchSearchPageFn<KV<String, Integer>> fetchSearchPageFn;

private final Schema schema;

FetchPatients(FhirEtlOptions options, Schema schema) {
Preconditions.checkState(!options.getActivePeriod().isEmpty());
List<String> dateRange = FhirSearchUtil.getDateRange(options.getActivePeriod());

int count = options.getBatchSize();
this.schema = schema;

fetchSearchPageFn = new FetchSearchPageFn<KV<String, Integer>>(options, "PatientById") {

@ProcessElement
public void ProcessElement(@Element KV<String, Integer> patientIdCount, MultiOutputReceiver out) {
String patientId = patientIdCount.getKey();
log.info(String.format("Already fetched %d resources for patient %s", patientIdCount.getValue(), patientId));
// TODO use openmrsUtil.fetchResource() instead of search and process bundle.
Bundle bundle = this.fhirSearchUtil.searchByUrl(PATIENT + "?_id=" + patientId, count, SummaryEnum.DATA);
processBundle(bundle, out);
}
};
}

@Override
public PCollectionTuple expand(PCollection<KV<String, Integer>> patientIdCounts) {
PCollectionTuple records = patientIdCounts.apply(ParDo.of(fetchSearchPageFn)
.withOutputTags(fetchSearchPageFn.avroTag, TupleTagList.of(Lists.newArrayList(fetchSearchPageFn.jsonTag))));
records.get(fetchSearchPageFn.avroTag).setCoder(AvroCoder.of(GenericRecord.class, schema));
return records;
}

public PCollection<GenericRecord> getAvroRecords(PCollectionTuple records) {
return fetchSearchPageFn.getAvroRecords(records);
}

public PCollection<String> getJsonRecords(PCollectionTuple records) {
return fetchSearchPageFn.getJsonRecords(records);
}

}
133 changes: 133 additions & 0 deletions batch/src/main/java/org/openmrs/analytics/FetchResources.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.openmrs.analytics;

import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import ca.uhn.fhir.rest.api.SummaryEnum;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.hl7.fhir.r4.model.Base;
import org.hl7.fhir.r4.model.Bundle;
import org.hl7.fhir.r4.model.Bundle.BundleEntryComponent;
import org.hl7.fhir.r4.model.Property;
import org.hl7.fhir.r4.model.Reference;
import org.hl7.fhir.r4.model.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This function object fetches all input segments from the FHIR source and converts each resource
* to JSON or Avro (or both). If a FHIR sink address is provided, those resources are passed to that
* FHIR sink too.
*/
// TODO: Add unit-tests for this class
public class FetchResources extends PTransform<PCollection<SearchSegmentDescriptor>, PCollectionTuple> {

private static final Logger log = LoggerFactory.getLogger(FetchResources.class);

private static final Pattern PATIENT_REFERENCE = Pattern.compile(".*Patient/([^/]+)");

final private Schema schema;

final private FetchSearchPageFn<SearchSegmentDescriptor> fetchSearchPageFn;

final private TupleTag<KV<String, Integer>> patientIdTag = new TupleTag<KV<String, Integer>>() {};

FetchResources(FhirEtlOptions options, String stageIdentifier, Schema schema) {
this.schema = schema;
fetchSearchPageFn = new FetchSearchPageFn<SearchSegmentDescriptor>(options, stageIdentifier) {

@ProcessElement
public void ProcessElement(@Element SearchSegmentDescriptor segment, MultiOutputReceiver out) {
String searchUrl = segment.searchUrl();
log.info(String.format("Fetching %d resources for statge %s; URL= %s", segment.count(), this.stageIdentifier,
searchUrl.substring(0, Math.min(200, searchUrl.length()))));
long fetchStartTime = System.currentTimeMillis();
Bundle pageBundle = fhirSearchUtil.searchByUrl(searchUrl, segment.count(), SummaryEnum.DATA);
addFetchTime(System.currentTimeMillis() - fetchStartTime);
processBundle(pageBundle, out);
if (pageBundle != null && pageBundle.getEntry() != null) {
for (BundleEntryComponent entry : pageBundle.getEntry()) {
String patientId = getSubjectPatientIdOrNull(entry.getResource());
if (patientId != null) {
out.get(patientIdTag).output(KV.of(patientId, 1));
}
}
}
}
};

}

@VisibleForTesting
static String getSubjectPatientIdOrNull(Resource resource) {
String patientId = null;
Property subject = resource.getNamedProperty("subject");
if (subject != null) {
List<Base> values = subject.getValues();
if (values.size() == 1) {
Reference reference = (Reference) values.get(0);
// TODO: Find a more generic way to check if this is a reference to a Patient. With the
// current OpenMRS setup, reference.getType() is null so we cannot rely on that.
String refStr = reference.getReference();
Matcher matcher = PATIENT_REFERENCE.matcher(refStr);
if (matcher.matches()) {
patientId = matcher.group(1);
}
if (patientId == null) {
log.warn(String.format("Ignoring subject of %s with id %s because it is not a Patient reference: %s",
resource.getResourceType(), resource.getId(), refStr));
}
}
if (values.size() > 1) {
log.warn(String.format("Unexpected multiple values for subject of %s with id %s", resource.getResourceType(),
resource.getId()));
}
}
return patientId;
}

@Override
public PCollectionTuple expand(PCollection<SearchSegmentDescriptor> segments) {
PCollectionTuple records = segments.apply(ParDo.of(fetchSearchPageFn).withOutputTags(fetchSearchPageFn.avroTag,
TupleTagList.of(Lists.newArrayList(fetchSearchPageFn.jsonTag, patientIdTag))));
records.get(fetchSearchPageFn.avroTag).setCoder(AvroCoder.of(GenericRecord.class, schema));
return records;
}

public PCollection<GenericRecord> getAvroRecords(PCollectionTuple records) {
return fetchSearchPageFn.getAvroRecords(records);
}

public PCollection<String> getJsonRecords(PCollectionTuple records) {
return fetchSearchPageFn.getJsonRecords(records);
}

public PCollection<KV<String, Integer>> getPatientIds(PCollectionTuple records) {
return records.get(patientIdTag);
}
}
Loading

0 comments on commit a540b5f

Please sign in to comment.