Skip to content

Commit

Permalink
Update BulkData 2.0.0: Update the OperationDefinitions and Capability…
Browse files Browse the repository at this point in the history
…Statement #3082  (#3441)

* Update for performance #2390

Signed-off-by: Paul Bastide <pbastide@us.ibm.com>

* Optimization for Check on Resource Type

Signed-off-by: Paul Bastide <pbastide@us.ibm.com>
  • Loading branch information
prb112 authored and lmsurpre committed Mar 9, 2022
1 parent ed4b3ca commit 353c846
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package com.ibm.fhir.bulkdata.jbatch.export.patient;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
Expand All @@ -20,11 +21,17 @@
import javax.enterprise.context.Dependent;
import javax.inject.Inject;

import com.ibm.fhir.bulkdata.export.system.resource.SystemExportResourceHandler;
import com.ibm.fhir.bulkdata.jbatch.context.BatchContextAdapter;
import com.ibm.fhir.operation.bulkdata.config.ConfigurationAdapter;
import com.ibm.fhir.operation.bulkdata.config.ConfigurationFactory;
import com.ibm.fhir.operation.bulkdata.model.type.BulkDataContext;
import com.ibm.fhir.operation.bulkdata.model.type.OperationFields;
import com.ibm.fhir.persistence.FHIRPersistence;
import com.ibm.fhir.persistence.HistorySortOrder;
import com.ibm.fhir.persistence.ResourceChangeLogRecord;
import com.ibm.fhir.persistence.helper.FHIRPersistenceHelper;
import com.ibm.fhir.persistence.helper.FHIRTransactionHelper;
import com.ibm.fhir.search.compartment.CompartmentHelper;

@Dependent
Expand Down Expand Up @@ -61,13 +68,36 @@ public PartitionPlan mapPartitions() throws Exception {
ConfigurationAdapter adapter = ConfigurationFactory.getInstance();
adapter.registerRequestContext(ctx.getTenantId(), ctx.getDatastoreId(), ctx.getIncomingUrl());

// Note we're already running inside a transaction (started by the JavaBatch framework)
// so this txn will just wrap it...the commit won't happen until the checkpoint
SystemExportResourceHandler handler = new SystemExportResourceHandler();
FHIRPersistenceHelper fhirPersistenceHelper = new FHIRPersistenceHelper(handler.getSearchHelper());
FHIRPersistence fhirPersistence = fhirPersistenceHelper.getFHIRPersistenceImplementation();
FHIRTransactionHelper txn = new FHIRTransactionHelper(fhirPersistence.getTransaction());
txn.begin();

// Check resourceType needs to be processed
List<String> target = new ArrayList<>();
try {
for (String resourceType : resourceTypes) {
List<ResourceChangeLogRecord> resourceResults = fhirPersistence.changes(1, null, null, null, Arrays.asList(resourceType), false, HistorySortOrder.NONE);

// Early Exit Logic
if (!resourceResults.isEmpty()) {
target.add(resourceType);
}
}
} finally {
txn.end();
}

PartitionPlanImpl pp = new PartitionPlanImpl();
pp.setPartitions(resourceTypes.size());
pp.setThreads(Math.min(adapter.getCoreMaxPartitions(), resourceTypes.size()));
Properties[] partitionProps = new Properties[resourceTypes.size()];
pp.setPartitions(target.size());
pp.setThreads(Math.min(adapter.getCoreMaxPartitions(), target.size()));
Properties[] partitionProps = new Properties[target.size()];

int propCount = 0;
for (String resourceType : resourceTypes) {
for (String resourceType : target) {
Properties p = new Properties();
p.setProperty(OperationFields.PARTITION_RESOURCETYPE, resourceType);
partitionProps[propCount++] = p;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package com.ibm.fhir.bulkdata.jbatch.export.system;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
Expand All @@ -20,10 +21,17 @@
import javax.enterprise.context.Dependent;
import javax.inject.Inject;

import com.ibm.fhir.bulkdata.export.system.resource.SystemExportResourceHandler;
import com.ibm.fhir.bulkdata.jbatch.context.BatchContextAdapter;
import com.ibm.fhir.operation.bulkdata.config.ConfigurationAdapter;
import com.ibm.fhir.operation.bulkdata.config.ConfigurationFactory;
import com.ibm.fhir.operation.bulkdata.model.type.BulkDataContext;
import com.ibm.fhir.operation.bulkdata.model.type.OperationFields;
import com.ibm.fhir.persistence.FHIRPersistence;
import com.ibm.fhir.persistence.HistorySortOrder;
import com.ibm.fhir.persistence.ResourceChangeLogRecord;
import com.ibm.fhir.persistence.helper.FHIRPersistenceHelper;
import com.ibm.fhir.persistence.helper.FHIRTransactionHelper;

/**
* Generates the {@link PartitionPlan} describing how the system export work is
Expand Down Expand Up @@ -51,16 +59,43 @@ public PartitionPlan mapPartitions() throws Exception {

BulkDataContext ctx = ctxAdapter.getStepContextForExportPartitionMapper();

// Register the context to get the right configuration.
ConfigurationAdapter adapter = ConfigurationFactory.getInstance();
adapter.registerRequestContext(ctx.getTenantId(), ctx.getDatastoreId(), ctx.getIncomingUrl());

// We know these are real resource types.
List<String> resourceTypes = Arrays.asList(ctx.getFhirResourceTypes().split("\\s*,\\s*"));

// Note we're already running inside a transaction (started by the JavaBatch framework)
// so this txn will just wrap it...the commit won't happen until the checkpoint
SystemExportResourceHandler handler = new SystemExportResourceHandler();
FHIRPersistenceHelper fhirPersistenceHelper = new FHIRPersistenceHelper(handler.getSearchHelper());
FHIRPersistence fhirPersistence = fhirPersistenceHelper.getFHIRPersistenceImplementation();
FHIRTransactionHelper txn = new FHIRTransactionHelper(fhirPersistence.getTransaction());
txn.begin();

// Check resourceType needs to be processed
List<String> target = new ArrayList<>();
try {
for (String resourceType : resourceTypes) {
List<ResourceChangeLogRecord> resourceResults = fhirPersistence.changes(1, null, null, null, Arrays.asList(resourceType), false, HistorySortOrder.NONE);

// Early Exit Logic
if (!resourceResults.isEmpty()) {
target.add(resourceType);
}
}
} finally {
txn.end();
}

PartitionPlanImpl pp = new PartitionPlanImpl();
pp.setPartitions(resourceTypes.size());
pp.setThreads(Math.min(ConfigurationFactory.getInstance().getCoreMaxPartitions(), resourceTypes.size()));
Properties[] partitionProps = new Properties[resourceTypes.size()];
pp.setPartitions(target.size());
pp.setThreads(Math.min(ConfigurationFactory.getInstance().getCoreMaxPartitions(), target.size()));
Properties[] partitionProps = new Properties[target.size()];

int propCount = 0;
for (String resourceType : resourceTypes) {
for (String resourceType : target) {
Properties p = new Properties();
p.setProperty(OperationFields.PARTITION_RESOURCETYPE, resourceType);
partitionProps[propCount++] = p;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@
},
"pageSize": 100,
"batchIdEncryptionKey": "change-password",
"maxPartitions": 3,
"maxPartitions": 5,
"maxInputs": 5
},
"storageProviders": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@
},
"pageSize": 100,
"batchIdEncryptionKey": "change-password",
"maxPartitions": 3,
"maxPartitions": 5,
"maxInputs": 5,
"maxChunkReadTime": "90000",
"systemExportImpl": "fast",
Expand Down

0 comments on commit 353c846

Please sign in to comment.