Skip to content

Commit

Permalink
$import Exit Status on Matrixed Resources results in NPE #2479
Browse files Browse the repository at this point in the history
Signed-off-by: Paul Bastide <pbastide@us.ibm.com>
  • Loading branch information
prb112 committed Jun 7, 2021
1 parent 093800b commit 3681c64
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private void source(BulkDataContext ctx) {

private void addImport(BulkDataContext ctx) {
ctx.setPartitionResourceType(props.getProperty(OperationFields.PARTITION_RESOURCETYPE));
ctx.setImportPartitionWorkitem(props.getProperty(OperationFields.PARTITTION_WORKITEM));
ctx.setImportPartitionWorkitem(props.getProperty(OperationFields.PARTITION_WORKITEM));
ctx.setDataSourceStorageType(props.getProperty(OperationFields.FHIR_IMPORT_STORAGE_TYPE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,19 @@ public class ChunkReader extends AbstractItemReader {

@Inject
@Any
@BatchProperty(name = OperationFields.PARTITTION_WORKITEM)
@BatchProperty(name = OperationFields.PARTITION_WORKITEM)
private String workItem;

@Inject
@Any
@BatchProperty(name = OperationFields.PARTITION_RESOURCETYPE)
private String resourceType;

@Inject
@Any
@BatchProperty(name = OperationFields.PARTITION_MATRIX)
private String matrix;

long numOfLinesToSkip = 0;

private BulkDataContext ctx = null;
Expand Down Expand Up @@ -89,6 +94,7 @@ public void open(Serializable checkpoint) throws Exception {
} else {
ImportTransientUserData chunkData = ImportTransientUserData.Builder.builder()
.importPartitionWorkitem(ctx.getImportPartitionWorkitem())
.matrixWorkItem(matrix)
.numOfProcessedResources(numOfLinesToSkip)
.importPartitionResourceType(ctx.getPartitionResourceType())
// This naming pattern is used in bulkdata operation to generate file links for import
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,14 @@ public class ChunkWriter extends AbstractItemWriter {

@Inject
@Any
@BatchProperty (name = OperationFields.PARTITTION_WORKITEM)
@BatchProperty (name = OperationFields.PARTITION_WORKITEM)
private String workItem;

@Inject
@Any
@BatchProperty(name = OperationFields.PARTITION_MATRIX)
private String matrix;

@Inject
@Any
@BatchProperty (name = OperationFields.PARTITION_RESOURCETYPE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ public PartitionPlan mapPartitions() throws Exception {
int propCount = 0;
for (BulkDataSource fhirDataSource : bdSources) {
Properties p = new Properties();
p.setProperty(OperationFields.PARTITTION_WORKITEM, fhirDataSource.getUrl());
p.setProperty(OperationFields.PARTITION_WORKITEM, fhirDataSource.getUrl());
p.setProperty(OperationFields.PARTITION_RESOURCETYPE, fhirDataSource.getType());
p.setProperty(OperationFields.PARTITION_MATRIX, fhirDataSource.getOriginalLocation());
partitionProps[propCount++] = p;
}
pp.setPartitionProperties(partitionProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class ImportCheckPointData implements Serializable {
private static final long serialVersionUID = 2189917861035732241L;
// URL or COS/S3 object name.
protected String importPartitionWorkitem;
protected String matrixWorkItem;

// Values for metrics calculation.
protected long numOfProcessedResources = 0;
Expand Down Expand Up @@ -128,6 +129,7 @@ public String getImportPartitionResourceType() {
public static ImportCheckPointData fromImportTransientUserData(ImportTransientUserData userData) {
return ImportCheckPointData.Builder.builder()
.importPartitionWorkitem(userData.getImportPartitionWorkitem())
.matrixWorkItem(userData.matrixWorkItem)
.numOfProcessedResources(userData.getNumOfProcessedResources())
.importPartitionResourceType(userData.getImportPartitionResourceType())
.numOfImportedResources(userData.getNumOfImportedResources())
Expand Down Expand Up @@ -261,9 +263,18 @@ public void setInFlyRateBeginMilliSeconds(long inFlyRateBeginMilliSeconds) {
this.inFlyRateBeginMilliSeconds = inFlyRateBeginMilliSeconds;
}

public void setMatrixWorkItem(String matrixWorkItem) {
this.matrixWorkItem = matrixWorkItem;
}

public String getMatrixWorkItem() {
return matrixWorkItem;
}

public static class Builder {

protected String importPartitionWorkitem;
protected String matrixWorkItem;
protected long numOfProcessedResources;
protected String importPartitionResourceType;
protected long numOfImportedResources;
Expand Down Expand Up @@ -296,6 +307,11 @@ public Builder importPartitionWorkitem(String importPartitionWorkitem) {
return this;
}

public Builder matrixWorkItem(String matrixWorkItem) {
this.matrixWorkItem = matrixWorkItem;
return this;
}

public Builder numOfProcessedResources(long numOfProcessedResources) {
this.numOfProcessedResources = numOfProcessedResources;
return this;
Expand Down Expand Up @@ -389,6 +405,7 @@ public Builder inFlyRateBeginMilliSeconds(long inFlyRateBeginMilliSeconds) {
public ImportCheckPointData build() {
ImportCheckPointData importCheckPointData = new ImportCheckPointData();
importCheckPointData.importPartitionWorkitem = this.importPartitionWorkitem;
importCheckPointData.matrixWorkItem = this.matrixWorkItem;
importCheckPointData.numOfProcessedResources = this.numOfProcessedResources;
importCheckPointData.importPartitionResourceType = this.importPartitionResourceType;
importCheckPointData.numOfImportedResources = this.numOfImportedResources;
Expand All @@ -414,6 +431,7 @@ public ImportCheckPointData build() {
@Override
public String toString() {
return "ImportCheckPointData [importPartitionWorkitem=" + importPartitionWorkitem + ", numOfProcessedResources=" + numOfProcessedResources
+ ", matrixWorkItem=" + matrixWorkItem
+ ", numOfImportedResources=" + numOfImportedResources + ", numOfImportFailures=" + numOfImportFailures + ", totalReadMilliSeconds="
+ totalReadMilliSeconds + ", totalWriteMilliSeconds=" + totalWriteMilliSeconds + ", totalValidationMilliSeconds=" + totalValidationMilliSeconds
+ ", importFileSize=" + importFileSize + ", inFlyRateBeginMilliSeconds=" + inFlyRateBeginMilliSeconds + ", numOfToBeImported="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public void setBufferReader(BufferedReader bufferReader) {
public static ImportTransientUserData fromImportCheckPointData(ImportCheckPointData importCheckPointData) {
return ImportTransientUserData.Builder.builder()
.importPartitionWorkitem(importCheckPointData.importPartitionWorkitem)
.matrixWorkItem(importCheckPointData.matrixWorkItem)
.numOfProcessedResources(importCheckPointData.numOfProcessedResources)
.importPartitionResourceType(importCheckPointData.importPartitionResourceType)
.numOfImportedResources(importCheckPointData.numOfImportedResources)
Expand Down Expand Up @@ -94,6 +95,11 @@ public Builder importPartitionWorkitem(String importPartitionWorkitem) {
return (Builder) super.importPartitionWorkitem(importPartitionWorkitem);
}

@Override
public Builder matrixWorkItem(String matrixWorkItem) {
return (Builder) super.matrixWorkItem(matrixWorkItem);
}

@Override
public Builder numOfProcessedResources(long numOfProcessedResources) {
return (Builder) super.numOfProcessedResources(numOfProcessedResources);
Expand Down Expand Up @@ -183,6 +189,7 @@ public Builder inFlyRateBeginMilliSeconds(long inFlyRateBeginMilliSeconds) {
public ImportTransientUserData build() {
ImportTransientUserData importTransientUserData = new ImportTransientUserData();
importTransientUserData.importPartitionWorkitem = this.importPartitionWorkitem;
importTransientUserData.matrixWorkItem = this.matrixWorkItem;
importTransientUserData.numOfProcessedResources = this.numOfProcessedResources;
importTransientUserData.importPartitionResourceType = this.importPartitionResourceType;
importTransientUserData.numOfImportedResources = this.numOfImportedResources;
Expand All @@ -207,6 +214,7 @@ public ImportTransientUserData build() {
@Override
public String toString() {
return "ImportTransientUserData [bufferStreamForImportError=" + bufferStreamForImportError + ", bufferStreamForImport=" + bufferStreamForImport
+ ", matrix=" + matrixWorkItem
+ ", inputStream=" + inputStream + ", bufferReader=" + bufferReader + ", importPartitionWorkitem=" + importPartitionWorkitem
+ ", numOfProcessedResources=" + numOfProcessedResources + ", numOfImportedResources=" + numOfImportedResources + ", numOfImportFailures="
+ numOfImportFailures + ", totalReadMilliSeconds=" + totalReadMilliSeconds + ", totalWriteMilliSeconds=" + totalWriteMilliSeconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
import java.util.Map;
import java.util.logging.Logger;

import com.ibm.fhir.bulkdata.jbatch.load.data.ImportCheckPointData;
import com.ibm.fhir.operation.bulkdata.model.type.OperationFields;

import jakarta.json.JsonArray;
import jakarta.json.JsonObject;
import jakarta.json.JsonValue;

import com.ibm.fhir.bulkdata.jbatch.load.data.ImportCheckPointData;
import com.ibm.fhir.operation.bulkdata.model.type.OperationFields;

/**
* Adapts the partitionSummaries and DatasourceArray into an Exit Status
*/
Expand All @@ -45,14 +45,22 @@ public String generateResultExitStatus() {
}

String[] resultInExitStatus = new String[sequenceNum];
System.out.println(partitionSummaries);
logger.fine(() -> "The partitions are " + partitionSummaries);
for (ImportCheckPointData partitionSummary : partitionSummaries) {
String key = partitionSummary.getImportPartitionResourceType() + ":" + partitionSummary.getImportPartitionWorkitem();
String key;
if (partitionSummary.getMatrixWorkItem() == null) {
key = partitionSummary.getImportPartitionResourceType() + ":" + partitionSummary.getImportPartitionWorkitem();
} else {
// must be matrixed
key = partitionSummary.getImportPartitionResourceType() + ":" + partitionSummary.getMatrixWorkItem();
}

if (!inputUrlSequenceMap.containsKey(key)) {
// Highly unlikely to hit now that the partition-resource-type is fixed
// So... this means that the Key is some how mutated.
logger.warning("Partition Key is incorrect '" + key + "'");
logger.warning("Partition Key is incorrect '" + key + "' matrix='" + partitionSummary.getMatrixWorkItem() + "'");
}

int index = inputUrlSequenceMap.get(key);
resultInExitStatus[index] = partitionSummary.getNumOfImportedResources() + ":" + partitionSummary.getNumOfImportFailures();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package com.ibm.fhir.bulkdata.load.partition.transformer.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.logging.Logger;

Expand All @@ -26,40 +25,34 @@
*/
public class S3Transformer implements PartitionSourceTransformer {

private static final Logger logger = Logger.getLogger(S3Transformer.class.getName());
private static final Logger LOG = Logger.getLogger(S3Transformer.class.getName());

@Override
public List<BulkDataSource> transformToDataSources(String source, String type, String location) throws FHIRException {
S3Provider provider = new S3Provider(source);
List<BulkDataSource> sources = new ArrayList<>();

// Checks and return empty list.
S3Provider provider = new S3Provider(source);
if (!provider.exists()) {
provider.listBuckets();
return Collections.emptyList();
}

String loc = location.trim();
LOG.fine(() -> "Location being verified is'" + loc + "'");

List<BulkDataSource> sources = new ArrayList<>();
String continuationToken = null;
ListObjectsV2Result result = null;
boolean shouldContinue = true;
while (shouldContinue) {
result = provider.getListObject(loc, continuationToken);
for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
System.out.println(objectSummary.getKey());
boolean isToBeProccessed = false;
LOG.fine(() -> "Object to be added [" + objectSummary.getKey() + "]");

if (!loc.isEmpty() && objectSummary.getKey().startsWith(loc)) {
// We read multiple files that start with the same pattern.
isToBeProccessed = true;
}
if (isToBeProccessed) {
logger.info("ObjectStorge Object -'" + objectSummary.getKey()
+ "' - '" + objectSummary.getSize() + "' bytes.");
if (objectSummary.getSize() >= 0) {
// We want these to line up when we align with the output of the Job Listener
sources.add(new BulkDataSource(type, objectSummary.getKey()));
}
LOG.fine(() -> "Object Summary -'" + objectSummary.getKey() + "' - '" + objectSummary.getSize() + "' bytes.");

BulkDataSource ds = new BulkDataSource(type, objectSummary.getKey());
ds.setOriginalLocation(location);
sources.add(ds);
}
}

Expand All @@ -72,6 +65,8 @@ public List<BulkDataSource> transformToDataSources(String source, String type, S
throw new FHIRLoadException("The source is not found '" + location + "'");
}

System.out.println(sources);

return sources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<property name="import.fhir.storagetype" value="#{jobParameters['import.fhir.storagetype']}"/>

<property name="partition.workitem" value="#{partitionPlan['partition.workitem']}"/>
<property name="partition.matrix" value="#{partitionPlan['partition.matrix']}"/>
<property name="partition.resourcetype" value="#{partitionPlan['partition.resourcetype']}"/>
</properties>
</reader>
Expand All @@ -43,6 +44,7 @@
<property name="fhir.bulkdata.outcome" value="#{jobParameters['fhir.bulkdata.outcome']}"/>

<property name="partition.workitem" value="#{partitionPlan['partition.workitem']}"/>
<property name="partition.matrix" value="#{partitionPlan['partition.matrix']}"/>
<property name="partition.resourcetype" value="#{partitionPlan['partition.resourcetype']}"/>
</properties>
</writer>
Expand Down
2 changes: 1 addition & 1 deletion fhir-server-test/src/test/resources/test.properties
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ test.db2.enabled = false
test.performance.default = 1

# Bulk Data
test.bulkdata.import.enabled = true
test.bulkdata.import.enabled = false
test.bulkdata.export.enabled = false
# Commented out this is added back in during automation.
# test.bulkdata.path = /liberty-runtime/wlp/usr/servers/fhir-server/output
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public class BulkDataSource {

private String type;
private String url;
private String originalLocation = "";

public BulkDataSource(String type, String url) {
super();
Expand All @@ -36,8 +37,16 @@ public void setUrl(String url) {
this.url = url;
}

public String getOriginalLocation() {
return originalLocation;
}

public void setOriginalLocation(String originalLocation) {
this.originalLocation = originalLocation;
}

@Override
public String toString() {
return "BulkDataSource [type=" + type + ", url=" + url + "]";
return "BulkDataSource [type=" + type + ", url=" + url + ", originalLocation=" + originalLocation + "]";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public final class OperationFields {

// Partition
public static final String PARTITION_RESOURCETYPE = "partition.resourcetype";
public static final String PARTITTION_WORKITEM = "partition.workitem";
public static final String PARTITION_WORKITEM = "partition.workitem";
public static final String PARTITION_MATRIX = "partition.matrix";

// Parameters
public static final String FHIR_SEARCH_FROM_DATE = "fhir.search.fromdate";
Expand Down Expand Up @@ -90,5 +91,4 @@ public final class OperationFields {

public static final String IMPORT_INPUT_RESOURCE_TYPE = "type";
public static final String IMPORT_INPUT_RESOURCE_URL = "url";

}

0 comments on commit 3681c64

Please sign in to comment.