Skip to content

Commit

Permalink
Merge pull request #868 from QAQJ/consumer-level-metrics
Browse files Browse the repository at this point in the history
Adding a new metric: Application-level MillisBehindLatest
  • Loading branch information
avahuang0429 authored Dec 6, 2021
2 parents 25714f5 + e220791 commit bedae95
Showing 1 changed file with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
@KinesisClientInternalApi
public class ProcessTask implements ConsumerTask {
private static final String PROCESS_TASK_OPERATION = "ProcessTask";
private static final String APPLICATION_TRACKER_OPERATION = "ApplicationTracker";
private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed";
private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed";
private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords";
Expand Down Expand Up @@ -112,20 +113,28 @@ public ProcessTask(@NonNull ShardInfo shardInfo,
*/
@Override
public TaskResult call() {
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
/**
* NOTE: the difference between appScope and shardScope is, appScope doesn't have shardId as a dimension,
* therefore all data added to appScope, although from different shard consumer, will be sent to the same metric,
* which is the app-level MillsBehindLatest metric.
*/
final MetricsScope appScope = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_TRACKER_OPERATION);
final MetricsScope shardScope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION);
shardInfo.streamIdentifierSerOpt()
.ifPresent(streamId -> MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(streamId)));
MetricsUtil.addShardId(scope, shardInfo.shardId());
.ifPresent(streamId -> MetricsUtil.addStreamId(shardScope, StreamIdentifier.multiStreamInstance(streamId)));
MetricsUtil.addShardId(shardScope, shardInfo.shardId());
long startTimeMillis = System.currentTimeMillis();
boolean success = false;
try {
scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY);
shardScope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY);
shardScope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY);
Exception exception = null;

try {
if (processRecordsInput.millisBehindLatest() != null) {
scope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(),
shardScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(),
StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY);
appScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(),
StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY);
}

Expand All @@ -142,11 +151,11 @@ public TaskResult call() {
}

if (!records.isEmpty()) {
scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
shardScope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
}

recordProcessorCheckpointer.largestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber(
scope, records, recordProcessorCheckpointer.lastCheckpointValue(),
shardScope, records, recordProcessorCheckpointer.lastCheckpointValue(),
recordProcessorCheckpointer.largestPermittedCheckpointValue()));

if (shouldCallProcessRecords(records)) {
Expand All @@ -165,8 +174,9 @@ public TaskResult call() {
}
return new TaskResult(exception);
} finally {
MetricsUtil.addSuccessAndLatency(scope, success, startTimeMillis, MetricsLevel.SUMMARY);
MetricsUtil.endScope(scope);
MetricsUtil.addSuccessAndLatency(shardScope, success, startTimeMillis, MetricsLevel.SUMMARY);
MetricsUtil.endScope(shardScope);
MetricsUtil.endScope(appScope);
}
}

Expand Down

0 comments on commit bedae95

Please sign in to comment.