Skip to content

Commit

Permalink
Merge branch 'master' into issue-144
Browse files Browse the repository at this point in the history
  • Loading branch information
mozzy11 authored Mar 28, 2021
2 parents ddbf080 + 17c1a28 commit 0c2ded4
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 14 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ resume from the last processed offset.
--fhirSinkPath=http://localhost:8098/fhir \
--sinkUserName=hapi --sinkPassword=hapi \
--outputParquetPath=/tmp/TEST/ \
--fhirDebeziumEventConfigPath=./utils/dbz_event_to_fhir_config.json \
--fhirDebeziumConfigPath=./utils/dbz_event_to_fhir_config.json \
--openmrsfhirBaseEndpoint=/ws/fhir2/R4"
```

Expand All @@ -232,7 +232,7 @@ resume from the last processed offset.
-Dexec.args="--openmrsServerUrl=http://localhost:8099/openmrs \
--openmrsUserName=admin --openmrsPassword=Admin123 \
--fhirSinkPath=projects/PROJECT/locations/LOCATION/datasets/DATASET/fhirStores/FHIRSTORENAME \
--fhirDebeziumEventConfigPath=./utils/dbz_event_to_fhir_config.json \
--fhirDebeziumConfigPath=./utils/dbz_event_to_fhir_config.json \
--openmrsfhirBaseEndpoint=/ws/fhir2/R4"
```

Expand Down Expand Up @@ -646,7 +646,7 @@ for streaming
mvn compile exec:java -pl streaming-binlog \
-Dexec.args="--databaseHostName=localhost \
--databasePort=3306 --databaseUser=root --databasePassword=debezium\
--databaseName=mysql --databaseSchema=openmrs --databaseServerId=77 \
--databaseServerName=mysql --databaseSchema=openmrs --databaseServerId=77 \
--openmrsUserName=admin --openmrsPassword=Admin123 \
--openmrsServerUrl=http://localhost:8099/openmrs \
--fhirSinkPath=http://localhost:5001/fhir \
Expand Down
28 changes: 25 additions & 3 deletions batch/src/main/java/org/openmrs/analytics/FhirEtl.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,15 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTagList;
import org.hl7.fhir.r4.model.Bundle;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -99,6 +104,19 @@ static String findSearchedResource(String search) {
return search;
}

static <T> PCollection<T> addWindow(PCollection<T> records, int secondsToFlush) {
if (secondsToFlush <= 0) {
return records;
}
// We are dealing with a bounded source hence we can simply use the single Global window.
// Also we don't need to deal with late data.
// TODO: Implement an easy way to unit-test this functionality.
return records.apply(Window.<T> into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(secondsToFlush))))
.discardingFiredPanes());
}

private static void fetchSegments(PCollection<SearchSegmentDescriptor> inputSegments, String search,
FhirEtlOptions options) {
String resourceType = findSearchedResource(search);
Expand All @@ -109,18 +127,22 @@ private static void fetchSegments(PCollection<SearchSegmentDescriptor> inputSegm
TupleTagList.of(fetchSearchPageFn.jsonTag)));
records.get(fetchSearchPageFn.avroTag).setCoder(AvroCoder.of(GenericRecord.class, schema));
if (!options.getOutputParquetPath().isEmpty()) {
PCollection<GenericRecord> windowedRecords = addWindow(records.get(fetchSearchPageFn.avroTag),
options.getSecondsToFlushFiles());
// TODO: Make sure getOutputParquetPath() is a directory.
String outputFile = options.getOutputParquetPath() + resourceType;
ParquetIO.Sink sink = ParquetIO.sink(schema); // TODO add an option for .withCompressionCodec();
records.get(fetchSearchPageFn.avroTag).apply(FileIO.<GenericRecord> write().via(sink).to(outputFile)
.withSuffix(".parquet").withNumShards(options.getNumFileShards()));
windowedRecords.apply(FileIO.<GenericRecord> write().via(sink).to(outputFile).withSuffix(".parquet")
.withNumShards(options.getNumFileShards()));
// TODO add Avro output option
// apply("WriteToAvro",
// AvroIO.writeGenericRecords(schema).to(outputFile).withSuffix(".avro")
// .withNumShards(options.getNumParquetShards()));
}
if (!options.getOutputJsonPath().isEmpty()) {
records.get(fetchSearchPageFn.jsonTag).apply("WriteToText",
PCollection<String> windowedRecords = addWindow(records.get(fetchSearchPageFn.jsonTag),
options.getSecondsToFlushFiles());
windowedRecords.apply("WriteToText",
TextIO.write().to(options.getOutputJsonPath() + resourceType).withSuffix(".txt"));
}
}
Expand Down
7 changes: 7 additions & 0 deletions batch/src/main/java/org/openmrs/analytics/FhirEtlOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,11 @@ public interface FhirEtlOptions extends PipelineOptions {
int getNumFileShards();

void setNumFileShards(int value);

@Description("The number of seconds after which records are flushed into Parquet/text files; "
+ "use 0 to disable (note this may have undesired memory implications).")
@Default.Integer(600)
int getSecondsToFlushFiles();

void setSecondsToFlushFiles(int value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ FhirConverter createFhirConverter(CamelContext camelContext) throws Exception {

private String getDebeziumConfig() {
Map<String, String> debeziumConfigs = this.generalConfiguration.getDebeziumConfigurations();
return "debezium-mysql:" + debeziumConfigs.get("databaseHostName") + "?" + "databaseHostname="
return "debezium-mysql:" + debeziumConfigs.get("databaseServerName") + "?" + "databaseHostname="
+ debeziumConfigs.get("databaseHostName") + "&databaseServerId=" + debeziumConfigs.get("databaseServerId")
+ "&databasePort=" + debeziumConfigs.get("databasePort") + "&databaseUser="
+ debeziumConfigs.get("databaseUser") + "&databasePassword=" + debeziumConfigs.get("databasePassword")
+ "&databaseServerName=" + debeziumConfigs.get("databaseName") + "&databaseWhitelist="
+ "&databaseServerName=" + debeziumConfigs.get("databaseServerName") + "&databaseWhitelist="
+ debeziumConfigs.get("databaseSchema")
+ "&offsetStorage=org.apache.kafka.connect.storage.FileOffsetBackingStore" + "&offsetStorageFileName="
+ debeziumConfigs.get("databaseOffsetStorage") + "&databaseHistoryFileFilename="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ public class EventConfiguration {
@Setter
private String databasePassword;

@Getter
@Setter
private String databaseName;

@Getter
@Setter
private int databaseServerId;
Expand Down
3 changes: 1 addition & 2 deletions utils/dbz_event_to_fhir_config.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
{
"debeziumConfigurations": {
"databaseName" : "mysql",
"databaseServerName" : "mysql",
"databaseHostName" : "localhost",
"databaseServerId" : "223344",
"databasePort" : "3306",
"databaseUser" : "root",
"databasePassword" : "debezium",
"databaseServerName" : "mysql",
"databaseSchema" : "openmrs",
"databaseOffsetStorage" : "data/offset.dat",
"databaseHistory" : "data/dbhistory.dat",
Expand Down

0 comments on commit 0c2ded4

Please sign in to comment.