Skip to content

Commit

Permalink
Merge pull request #3358 from IBM/issue-3000
Browse files Browse the repository at this point in the history
ImportPartitionAnalyzer not reporting infly rate on threshold #3000
  • Loading branch information
lmsurpre authored Feb 16, 2022
2 parents fff93c9 + 0b706a1 commit 6a0e929
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright IBM Corp. 2020, 2021
* (C) Copyright IBM Corp. 2020, 2022
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand Down Expand Up @@ -30,6 +30,7 @@ public class ImportPartitionAnalyzer implements PartitionAnalyzer {
private static final Logger logger = Logger.getLogger(ImportPartitionAnalyzer.class.getName());

private static final ConfigurationAdapter adapter = ConfigurationFactory.getInstance();
private static final DecimalFormat FORMAT = new DecimalFormat("#0.000");

@Inject
JobContext jobContext;
Expand Down Expand Up @@ -72,30 +73,34 @@ public void analyzeCollectorData(Serializable data) {
importedResourceTypeInFlySummaries.get(partitionSummaryForMetrics.getImportPartitionResourceType());
if (importedResourceTypeInFlySummary == null) {
// Instantiate a partition summary for this resourceType and add it to the list
// Note we're caching here in case there are more than 1 executions of the job.
// (for instance a recoverable failure of the job during execution).
importedResourceTypeInFlySummary =
ImportCheckPointData.Builder.builder()
.importPartitionResourceType(partitionSummaryForMetrics.getImportPartitionResourceType())
.numOfProcessedResources(adapter.getImportNumberOfFhirResourcesPerRead(null))
.importPartitionWorkitem(partitionSummaryForMetrics.getImportPartitionWorkitem())
.numOfProcessedResources(partitionSummaryForMetrics.getNumOfProcessedResources())
.inFlyRateBeginMilliSeconds(partitionSummaryForMetrics.getInFlyRateBeginMilliSeconds())
.build();

importedResourceTypeInFlySummaries.put(partitionSummaryForMetrics.getImportPartitionResourceType(), importedResourceTypeInFlySummary);
} else {
// Add info to the object thats already in the list
importedResourceTypeInFlySummary.setNumOfProcessedResources(importedResourceTypeInFlySummary.getNumOfProcessedResources()
+ adapter.getImportNumberOfFhirResourcesPerRead(null));

if (importedResourceTypeInFlySummary.getNumOfProcessedResources() % adapter.getImportInflyRateNumberOfFhirResources(null) == 0) {
// Add current summary to the local cache.
importedResourceTypeInFlySummary.addToNumOfProcessedResources(partitionSummaryForMetrics.getNumOfToBeImported());
if ((importedResourceTypeInFlySummary.getNumOfProcessedResources() - importedResourceTypeInFlySummary.getLastChecked()) > adapter.getImportInflyRateNumberOfFhirResources(null)) {
long currentTimeMilliSeconds = System.currentTimeMillis();
double jobProcessingSeconds = (currentTimeMilliSeconds - importedResourceTypeInFlySummary.getInFlyRateBeginMilliSeconds()) / 1000.0;
jobProcessingSeconds = jobProcessingSeconds < 1 ? 1.0 : jobProcessingSeconds;
long time = currentTimeMilliSeconds - importedResourceTypeInFlySummary.getInFlyRateBeginMilliSeconds();
double rate = partitionSummaryForMetrics.getNumOfToBeImported() / (1.0 * time);

// log the in-fly rate.
logger.info(adapter.getImportInflyRateNumberOfFhirResources(null) + " " + importedResourceTypeInFlySummary.getImportPartitionResourceType()
+ " resources imported in " + jobProcessingSeconds + " seconds, ImportRate: "
+ new DecimalFormat("#0.00").format(adapter.getImportInflyRateNumberOfFhirResources(null) / jobProcessingSeconds) + "/Second");
logger.info("Import in-fly rate: [" + jobCtx.getInstanceId() + "/" + jobCtx.getExecutionId() + "/"
+ importedResourceTypeInFlySummary.getImportPartitionWorkitem() + "/" + importedResourceTypeInFlySummary.getImportPartitionResourceType()
+ "] reportingThreshold=[" + adapter.getImportInflyRateNumberOfFhirResources(null)
+ "] resources imported in " + time + " milliseconds, ImportRate: ["
+ FORMAT.format(rate) + "] Resources/milliseconds");

importedResourceTypeInFlySummary.setInFlyRateBeginMilliSeconds(currentTimeMilliSeconds);
importedResourceTypeInFlySummary.setLastChecked(importedResourceTypeInFlySummary.getNumOfProcessedResources());
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public class ImportCheckPointData implements Serializable {
// ETags for COS/S3 multiple-parts upload.
protected List<PartETag> dataPacksForFailureOperationOutcomes = new ArrayList<>();

// Used to track last resource count when in fly rate was logged
protected long lastChecked = 0;

protected ImportCheckPointData() {
super();
}
Expand Down Expand Up @@ -137,6 +140,14 @@ public void setCurrentBytes(long currentBytes) {
this.currentBytes = currentBytes;
}

public long getLastChecked() {
return lastChecked;
}

public void setLastChecked(long lastChecked) {
this.lastChecked = lastChecked;
}

public static ImportCheckPointData fromImportTransientUserData(ImportTransientUserData userData) {
return ImportCheckPointData.Builder.builder()
.importPartitionWorkitem(userData.getImportPartitionWorkitem())
Expand Down

0 comments on commit 6a0e929

Please sign in to comment.