From 1dc1d32606162dbea6314b5fdf1317b6e0ede53e Mon Sep 17 00:00:00 2001 From: Paul Bastide Date: Wed, 16 Feb 2022 12:56:45 -0500 Subject: [PATCH 1/2] ImportPartitionAnalyzer not reporting infly rate on threshold #3000 ImportPartitionAnalyzer - modify the infly rate reporting message and changed the message to report out Resources/second and report which workitem is triggering the message. Improve accuracy of the infly rate calculation Sample Message: [INFO ] Import in-fly rate: [286/286/r4_AllergyIntolerance.ndjson/AllergyIntolerance] reportingThreshold=[2000] resources imported in 12610 milliseconds, ImportRate: [0.064] Resources/milliseconds Instance/Execution/File/ResourceType threshold milliseconds and rate. Signed-off-by: Paul Bastide --- .../jbatch/load/ImportPartitionAnalyzer.java | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/fhir-bulkdata-webapp/src/main/java/com/ibm/fhir/bulkdata/jbatch/load/ImportPartitionAnalyzer.java b/fhir-bulkdata-webapp/src/main/java/com/ibm/fhir/bulkdata/jbatch/load/ImportPartitionAnalyzer.java index 62457628bd5..7f9ca10864b 100644 --- a/fhir-bulkdata-webapp/src/main/java/com/ibm/fhir/bulkdata/jbatch/load/ImportPartitionAnalyzer.java +++ b/fhir-bulkdata-webapp/src/main/java/com/ibm/fhir/bulkdata/jbatch/load/ImportPartitionAnalyzer.java @@ -1,5 +1,5 @@ /* - * (C) Copyright IBM Corp. 2020, 2021 + * (C) Copyright IBM Corp. 2020, 2022 * * SPDX-License-Identifier: Apache-2.0 */ @@ -30,6 +30,7 @@ public class ImportPartitionAnalyzer implements PartitionAnalyzer { private static final Logger logger = Logger.getLogger(ImportPartitionAnalyzer.class.getName()); private static final ConfigurationAdapter adapter = ConfigurationFactory.getInstance(); + private static final DecimalFormat FORMAT = new DecimalFormat("#0.000"); @Inject JobContext jobContext; @@ -72,30 +73,34 @@ public void analyzeCollectorData(Serializable data) { importedResourceTypeInFlySummaries.get(partitionSummaryForMetrics.getImportPartitionResourceType()); if (importedResourceTypeInFlySummary == null) { // Instantiate a partition summary for this resourceType and add it to the list + // Note we're caching here in case there are more than 1 executions of the job. + // (for instance a recoverable failure of the job during execution). importedResourceTypeInFlySummary = ImportCheckPointData.Builder.builder() .importPartitionResourceType(partitionSummaryForMetrics.getImportPartitionResourceType()) - .numOfProcessedResources(adapter.getImportNumberOfFhirResourcesPerRead(null)) + .importPartitionWorkitem(partitionSummaryForMetrics.getImportPartitionWorkitem()) + .numOfProcessedResources(partitionSummaryForMetrics.getNumOfProcessedResources()) .inFlyRateBeginMilliSeconds(partitionSummaryForMetrics.getInFlyRateBeginMilliSeconds()) .build(); importedResourceTypeInFlySummaries.put(partitionSummaryForMetrics.getImportPartitionResourceType(), importedResourceTypeInFlySummary); } else { - // Add info to the object thats already in the list - importedResourceTypeInFlySummary.setNumOfProcessedResources(importedResourceTypeInFlySummary.getNumOfProcessedResources() - + adapter.getImportNumberOfFhirResourcesPerRead(null)); - - if (importedResourceTypeInFlySummary.getNumOfProcessedResources() % adapter.getImportInflyRateNumberOfFhirResources(null) == 0) { + // Add current summary to the local cache. + importedResourceTypeInFlySummary.addToNumOfProcessedResources(partitionSummaryForMetrics.getNumOfToBeImported()); + if ((importedResourceTypeInFlySummary.getNumOfProcessedResources() - importedResourceTypeInFlySummary.getLastChecked()) > adapter.getImportInflyRateNumberOfFhirResources(null)) { long currentTimeMilliSeconds = System.currentTimeMillis(); - double jobProcessingSeconds = (currentTimeMilliSeconds - importedResourceTypeInFlySummary.getInFlyRateBeginMilliSeconds()) / 1000.0; - jobProcessingSeconds = jobProcessingSeconds < 1 ? 1.0 : jobProcessingSeconds; + long time = currentTimeMilliSeconds - importedResourceTypeInFlySummary.getInFlyRateBeginMilliSeconds(); + double rate = partitionSummaryForMetrics.getNumOfToBeImported() / (1.0 * time); // log the in-fly rate. - logger.info(adapter.getImportInflyRateNumberOfFhirResources(null) + " " + importedResourceTypeInFlySummary.getImportPartitionResourceType() - + " resources imported in " + jobProcessingSeconds + " seconds, ImportRate: " - + new DecimalFormat("#0.00").format(adapter.getImportInflyRateNumberOfFhirResources(null) / jobProcessingSeconds) + "/Second"); + logger.info("Import in-fly rate: [" + jobCtx.getInstanceId() + "/" + jobCtx.getExecutionId() + "/" + + importedResourceTypeInFlySummary.getImportPartitionWorkitem() + "/" + importedResourceTypeInFlySummary.getImportPartitionResourceType() + + "] reportingThreshold=[" + adapter.getImportInflyRateNumberOfFhirResources(null) + + "] resources imported in " + time + " milliseconds, ImportRate: [" + + FORMAT.format(rate) + "] Resources/milliseconds"); importedResourceTypeInFlySummary.setInFlyRateBeginMilliSeconds(currentTimeMilliSeconds); + importedResourceTypeInFlySummary.setLastChecked(importedResourceTypeInFlySummary.getNumOfProcessedResources()); } } } catch (Exception e) { From 0b706a18d3845bc299764d9cda11b681f2dd397d Mon Sep 17 00:00:00 2001 From: Paul Bastide Date: Wed, 16 Feb 2022 13:23:50 -0500 Subject: [PATCH 2/2] Update data checkpoint Signed-off-by: Paul Bastide --- .../jbatch/load/data/ImportCheckPointData.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/fhir-bulkdata-webapp/src/main/java/com/ibm/fhir/bulkdata/jbatch/load/data/ImportCheckPointData.java b/fhir-bulkdata-webapp/src/main/java/com/ibm/fhir/bulkdata/jbatch/load/data/ImportCheckPointData.java index 508449ecd8f..ddf6bcf7c2f 100644 --- a/fhir-bulkdata-webapp/src/main/java/com/ibm/fhir/bulkdata/jbatch/load/data/ImportCheckPointData.java +++ b/fhir-bulkdata-webapp/src/main/java/com/ibm/fhir/bulkdata/jbatch/load/data/ImportCheckPointData.java @@ -61,6 +61,9 @@ public class ImportCheckPointData implements Serializable { // ETags for COS/S3 multiple-parts upload. protected List dataPacksForFailureOperationOutcomes = new ArrayList<>(); + // Used to track last resource count when in fly rate was logged + protected long lastChecked = 0; + protected ImportCheckPointData() { super(); } @@ -137,6 +140,14 @@ public void setCurrentBytes(long currentBytes) { this.currentBytes = currentBytes; } + public long getLastChecked() { + return lastChecked; + } + + public void setLastChecked(long lastChecked) { + this.lastChecked = lastChecked; + } + public static ImportCheckPointData fromImportTransientUserData(ImportTransientUserData userData) { return ImportCheckPointData.Builder.builder() .importPartitionWorkitem(userData.getImportPartitionWorkitem())