Skip to content

Commit

Permalink
allows adding Resources of multiple types in the same ndjson which c…
Browse files Browse the repository at this point in the history
…ould include unsupported resources. #3180

Signed-off-by: Paul Bastide <pbastide@us.ibm.com>
  • Loading branch information
prb112 committed Feb 16, 2022
1 parent e6a806d commit 5560089
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 11 deletions.
3 changes: 3 additions & 0 deletions docs/src/pages/guides/FHIRServerUsersGuide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2244,6 +2244,7 @@ This section contains reference information about each of the configuration prop
|`fhirServer/bulkdata/storageProviders/<source>/operationOutcomeProvider`|string| the default storage provider used to output Operation Outcomes (file, s3 only)|
|`fhirServer/bulkdata/storageProviders/<source>/accessType`|string| The s3 access type, `host` or `path` (s3 only) [Link](https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html)|
|`fhirServer/bulkdata/storageProviders/<source>/requiresAccessToken`|boolean|controls the `$bulkdata-status` response to indicate Bulk Data storageprovider requires an accessToken using `requiresAccessToken`. When presigned URLs are enabled, this setting is overridden and shows as false in the $export response.|
|`fhirServer/bulkdata/storageProviders/<source>/allowAllResources`|string|Enables multiple resources to be loaded from a single ndjson.|
|`fhirServer/operations/erase/enabled`|boolean|Enables the $erase operation|
|`fhirServer/operations/erase/allowedRoles`|list|The list of allowed roles, allowed entries are: `FHIRUsers` every authenticated user, `FHIROperationAdmin` which is authenticated `FHIRAdmin` users|
|`fhirServer/operations/membermatch/enabled`|boolean|Enables or disables the $member-match|
Expand Down Expand Up @@ -2388,6 +2389,7 @@ This section contains reference information about each of the configuration prop
|`fhirServer/bulkdata/storageProviders/<source>/create`|false|
|`fhirServer/bulkdata/storageProviders/<source>/accessType`|`path`|
|`fhirServer/bulkdata/storageProviders/<source>/requiresAccessToken`|false|
|`fhirServer/bulkdata/storageProviders/<source>/allowAllResources`|false|
|`fhirServer/operations/erase/enabled`|false|
|`fhirServer/operations/erase/allowedRoles`|empty, all roles|
|`fhirServer/operations/membermatch/enabled`|true|
Expand Down Expand Up @@ -2565,6 +2567,7 @@ must restart the server for that change to take effect.
|`fhirServer/bulkdata/storageProviders/<source>/operationOutcomeProvider`|Y|Y|
|`fhirServer/bulkdata/storageProviders/<source>/accessType`|Y|Y|
|`fhirServer/bulkdata/storageProviders/<source>/requiresAccessToken`|Y|Y|
|`fhirServer/bulkdata/storageProviders/<source>/allowAllResources`|Y|Y|
|`fhirServer/operations/erase/enabled`|Y|Y|
|`fhirServer/operations/erase/allowedRoles`|Y|Y|
|`fhirServer/operations/membermatch/enabled`|Y|Y|
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright IBM Corp. 2019, 2021
* (C) Copyright IBM Corp. 2019, 2022
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand Down Expand Up @@ -141,18 +141,21 @@ public Object readItem() throws Exception {

ImportTransientUserData chunkData = (ImportTransientUserData) stepCtx.getTransientUserData();
numOfLinesToSkip = chunkData.getNumOfProcessedResources();
logger.fine(() -> "Number of lines to skip are: '" + numOfLinesToSkip + "'");

Provider wrapper = ProviderFactory.getSourceWrapper(ctx.getSource(), ctx.getDataSourceStorageType());
wrapper.registerTransient(chunkData);
if (logger.isLoggable(Level.FINE)) {
logger.fine(() -> "Number of lines to skip are: '" + numOfLinesToSkip + "'");
}

Provider provider = ProviderFactory.getSourceWrapper(ctx.getSource(), ctx.getDataSourceStorageType());
provider.registerTransient(chunkData);

long readStartTimeInMilliSeconds = System.currentTimeMillis();
wrapper.readResources(numOfLinesToSkip, ctx.getImportPartitionWorkitem());
provider.readResources(numOfLinesToSkip, ctx.getImportPartitionWorkitem());

long numOfParseFailures = wrapper.getNumberOfParseFailures();
long numOfLoaded = wrapper.getNumberOfLoaded();
long numOfParseFailures = provider.getNumberOfParseFailures();
long numOfLoaded = provider.getNumberOfLoaded();

List<Resource> resources = wrapper.getResources();
List<Resource> resources = provider.getResources();

chunkData.addToTotalReadMilliSeconds(System.currentTimeMillis() - readStartTimeInMilliSeconds);
chunkData.setNumOfParseFailures(numOfParseFailures);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;

import javax.batch.api.BatchProperty;
import javax.batch.api.chunk.AbstractItemWriter;
Expand Down Expand Up @@ -139,7 +140,7 @@ public void writeItems(List<java.lang.Object> arg0) throws Exception {
int failedNum = 0;
ImportTransientUserData chunkData = (ImportTransientUserData) stepCtx.getTransientUserData();

// Validate the resources first if required.
// Validates the Resources are valid for included profiles
if (adapter.shouldStorageProviderValidateResources(ctx.getSource())) {
long validationStartTimeInMilliSeconds = System.currentTimeMillis();
for (Object objResJsonList : arg0) {
Expand All @@ -155,7 +156,7 @@ public void writeItems(List<java.lang.Object> arg0) throws Exception {
} catch (FHIRValidationException | FHIROperationException e) {
logger.warning("Failed to validate '" + fhirResource.getId() + "' due to error: " + e.getMessage());
failedNum++;
failValidationIds.add(fhirResource.getId());
failValidationIds.add(fhirResource.getClass().getSimpleName() + "/" + fhirResource.getId());

if (adapter.shouldStorageProviderCollectOperationOutcomes(ctx.getSource())) {
OperationOutcome operationOutCome = FHIRUtil.buildOperationOutcome(e, false);
Expand All @@ -174,6 +175,20 @@ public void writeItems(List<java.lang.Object> arg0) throws Exception {
chunkData.addTotalValidationMilliSeconds(System.currentTimeMillis() - validationStartTimeInMilliSeconds);
}

// Validates the ResourceType matches
if (!adapter.shouldStorageProviderAllowAllResources(ctx.getSource())) {
for (Object objResJsonList : arg0) {
@SuppressWarnings("unchecked")
List<Resource> fhirResourceList = (List<Resource>) objResJsonList;

for (Resource fhirResource : fhirResourceList) {
if (this.resourceType != fhirResource.getClass().getSimpleName()) {
failValidationIds.add(fhirResource.getClass().getSimpleName() + "/" + fhirResource.getId());
}
}
}
}

// Begin writing the resources into DB.
long writeStartTimeInMilliSeconds = System.currentTimeMillis();
// Acquire a DB connection which will be used in the batch.
Expand All @@ -199,7 +214,7 @@ public void writeItems(List<java.lang.Object> arg0) throws Exception {
String id = fhirResource.getId();
processedNum++;
// Skip the resources which failed the validation
if (failValidationIds.contains(id)) {
if (failValidationIds.contains(fhirResource.getClass().getSimpleName() + "/" + id)) {
continue;
}
OperationOutcome operationOutcome;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,4 +548,14 @@ default boolean isStorageProviderParquetEnabled(String provider) {
* @return
*/
boolean getStorageProviderUsesRequestAccessToken(String provider);

/**
* allows multiple resources in a single file.
*
* @implNote this default is false.
*
* @param source
* @return
*/
boolean shouldStorageProviderAllowAllResources(String source);
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,9 @@ public boolean getStorageProviderUsesRequestAccessToken(String provider) {
return FHIRConfigHelper.getBooleanProperty("fhirServer/bulkdata/storageProviders/" + provider + "/requiresAccessToken", Boolean.FALSE)
&& !isStorageProviderHmacPresigned(provider);
}

@Override
public boolean shouldStorageProviderAllowAllResources(String provider) {
return FHIRConfigHelper.getBooleanProperty("fhirServer/bulkdata/storageProviders/" + provider + "/allowAllResources", Boolean.FALSE);
}
}

0 comments on commit 5560089

Please sign in to comment.