Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ImportPartitionAnalyzer not reporting infly rate on threshold #3000 #3358

Merged
merged 2 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright IBM Corp. 2020, 2021
* (C) Copyright IBM Corp. 2020, 2022
*
* SPDX-License-Identifier: Apache-2.0
*/
Expand Down Expand Up @@ -30,6 +30,7 @@ public class ImportPartitionAnalyzer implements PartitionAnalyzer {
private static final Logger logger = Logger.getLogger(ImportPartitionAnalyzer.class.getName());

private static final ConfigurationAdapter adapter = ConfigurationFactory.getInstance();
private static final DecimalFormat FORMAT = new DecimalFormat("#0.000");

@Inject
JobContext jobContext;
Expand Down Expand Up @@ -72,30 +73,34 @@ public void analyzeCollectorData(Serializable data) {
importedResourceTypeInFlySummaries.get(partitionSummaryForMetrics.getImportPartitionResourceType());
if (importedResourceTypeInFlySummary == null) {
// Instantiate a partition summary for this resourceType and add it to the list
// Note we're caching here in case there are more than 1 executions of the job.
// (for instance a recoverable failure of the job during execution).
importedResourceTypeInFlySummary =
ImportCheckPointData.Builder.builder()
.importPartitionResourceType(partitionSummaryForMetrics.getImportPartitionResourceType())
.numOfProcessedResources(adapter.getImportNumberOfFhirResourcesPerRead(null))
.importPartitionWorkitem(partitionSummaryForMetrics.getImportPartitionWorkitem())
.numOfProcessedResources(partitionSummaryForMetrics.getNumOfProcessedResources())
.inFlyRateBeginMilliSeconds(partitionSummaryForMetrics.getInFlyRateBeginMilliSeconds())
.build();

importedResourceTypeInFlySummaries.put(partitionSummaryForMetrics.getImportPartitionResourceType(), importedResourceTypeInFlySummary);
} else {
// Add info to the object thats already in the list
importedResourceTypeInFlySummary.setNumOfProcessedResources(importedResourceTypeInFlySummary.getNumOfProcessedResources()
+ adapter.getImportNumberOfFhirResourcesPerRead(null));

if (importedResourceTypeInFlySummary.getNumOfProcessedResources() % adapter.getImportInflyRateNumberOfFhirResources(null) == 0) {
// Add current summary to the local cache.
importedResourceTypeInFlySummary.addToNumOfProcessedResources(partitionSummaryForMetrics.getNumOfToBeImported());
if ((importedResourceTypeInFlySummary.getNumOfProcessedResources() - importedResourceTypeInFlySummary.getLastChecked()) > adapter.getImportInflyRateNumberOfFhirResources(null)) {
long currentTimeMilliSeconds = System.currentTimeMillis();
double jobProcessingSeconds = (currentTimeMilliSeconds - importedResourceTypeInFlySummary.getInFlyRateBeginMilliSeconds()) / 1000.0;
jobProcessingSeconds = jobProcessingSeconds < 1 ? 1.0 : jobProcessingSeconds;
long time = currentTimeMilliSeconds - importedResourceTypeInFlySummary.getInFlyRateBeginMilliSeconds();
double rate = partitionSummaryForMetrics.getNumOfToBeImported() / (1.0 * time);

// log the in-fly rate.
logger.info(adapter.getImportInflyRateNumberOfFhirResources(null) + " " + importedResourceTypeInFlySummary.getImportPartitionResourceType()
+ " resources imported in " + jobProcessingSeconds + " seconds, ImportRate: "
+ new DecimalFormat("#0.00").format(adapter.getImportInflyRateNumberOfFhirResources(null) / jobProcessingSeconds) + "/Second");
logger.info("Import in-fly rate: [" + jobCtx.getInstanceId() + "/" + jobCtx.getExecutionId() + "/"
+ importedResourceTypeInFlySummary.getImportPartitionWorkitem() + "/" + importedResourceTypeInFlySummary.getImportPartitionResourceType()
+ "] reportingThreshold=[" + adapter.getImportInflyRateNumberOfFhirResources(null)
+ "] resources imported in " + time + " milliseconds, ImportRate: ["
+ FORMAT.format(rate) + "] Resources/milliseconds");

importedResourceTypeInFlySummary.setInFlyRateBeginMilliSeconds(currentTimeMilliSeconds);
importedResourceTypeInFlySummary.setLastChecked(importedResourceTypeInFlySummary.getNumOfProcessedResources());
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public class ImportCheckPointData implements Serializable {
// ETags for COS/S3 multiple-parts upload.
protected List<PartETag> dataPacksForFailureOperationOutcomes = new ArrayList<>();

// Used to track last resource count when in fly rate was logged
protected long lastChecked = 0;

protected ImportCheckPointData() {
super();
}
Expand Down Expand Up @@ -137,6 +140,14 @@ public void setCurrentBytes(long currentBytes) {
this.currentBytes = currentBytes;
}

public long getLastChecked() {
return lastChecked;
}

public void setLastChecked(long lastChecked) {
this.lastChecked = lastChecked;
}

public static ImportCheckPointData fromImportTransientUserData(ImportTransientUserData userData) {
return ImportCheckPointData.Builder.builder()
.importPartitionWorkitem(userData.getImportPartitionWorkitem())
Expand Down