From b43c14476bfa8fdb2b0f17f7af5b88e7ecd85b61 Mon Sep 17 00:00:00 2001 From: Qilin Jin Date: Thu, 18 Nov 2021 10:48:23 -0800 Subject: [PATCH 1/4] first commit for app-level mills_behind_latest metric --- .../amazon/kinesis/lifecycle/ProcessTask.java | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index b3ba8a7de..bd3b9721a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -45,6 +45,7 @@ @KinesisClientInternalApi public class ProcessTask implements ConsumerTask { private static final String PROCESS_TASK_OPERATION = "ProcessTask"; + private static final String APPLICATION_LEVEL_METRICS = "ApplicationLevelMetrics"; private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed"; private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed"; private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords"; @@ -112,20 +113,23 @@ public ProcessTask(@NonNull ShardInfo shardInfo, */ @Override public TaskResult call() { - final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); + final MetricsScope scope_app = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_LEVEL_METRICS); + final MetricsScope scope_shard = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); shardInfo.streamIdentifierSerOpt() - .ifPresent(streamId -> MetricsUtil.addStreamId(scope, StreamIdentifier.multiStreamInstance(streamId))); - MetricsUtil.addShardId(scope, shardInfo.shardId()); + .ifPresent(streamId -> MetricsUtil.addStreamId(scope_shard, StreamIdentifier.multiStreamInstance(streamId))); + MetricsUtil.addShardId(scope_shard, shardInfo.shardId()); long startTimeMillis = System.currentTimeMillis(); boolean success = false; try { - scope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY); - scope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY); + scope_shard.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY); + scope_shard.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY); Exception exception = null; try { if (processRecordsInput.millisBehindLatest() != null) { - scope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), + scope_shard.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), + StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY); + scope_app.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY); } @@ -142,11 +146,11 @@ public TaskResult call() { } if (!records.isEmpty()) { - scope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); + scope_shard.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); } recordProcessorCheckpointer.largestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber( - scope, records, recordProcessorCheckpointer.lastCheckpointValue(), + scope_shard, records, recordProcessorCheckpointer.lastCheckpointValue(), recordProcessorCheckpointer.largestPermittedCheckpointValue())); if (shouldCallProcessRecords(records)) { @@ -165,8 +169,9 @@ public TaskResult call() { } return new TaskResult(exception); } finally { - MetricsUtil.addSuccessAndLatency(scope, success, startTimeMillis, MetricsLevel.SUMMARY); - MetricsUtil.endScope(scope); + MetricsUtil.addSuccessAndLatency(scope_shard, success, startTimeMillis, MetricsLevel.SUMMARY); + MetricsUtil.endScope(scope_shard); + MetricsUtil.endScope(scope_app); } } From 66e5dfecb596bd7cd40fd8fcbf3036e2b66a8c15 Mon Sep 17 00:00:00 2001 From: Qilin Jin Date: Wed, 1 Dec 2021 11:38:21 -0800 Subject: [PATCH 2/4] fix some naming convention issue --- .../amazon/kinesis/lifecycle/ProcessTask.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index bd3b9721a..f01acd895 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -113,23 +113,23 @@ public ProcessTask(@NonNull ShardInfo shardInfo, */ @Override public TaskResult call() { - final MetricsScope scope_app = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_LEVEL_METRICS); - final MetricsScope scope_shard = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); + final MetricsScope AppScope = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_LEVEL_METRICS); + final MetricsScope ShardScope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); shardInfo.streamIdentifierSerOpt() - .ifPresent(streamId -> MetricsUtil.addStreamId(scope_shard, StreamIdentifier.multiStreamInstance(streamId))); - MetricsUtil.addShardId(scope_shard, shardInfo.shardId()); + .ifPresent(streamId -> MetricsUtil.addStreamId(ShardScope, StreamIdentifier.multiStreamInstance(streamId))); + MetricsUtil.addShardId(ShardScope, shardInfo.shardId()); long startTimeMillis = System.currentTimeMillis(); boolean success = false; try { - scope_shard.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY); - scope_shard.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY); + ShardScope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY); + ShardScope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY); Exception exception = null; try { if (processRecordsInput.millisBehindLatest() != null) { - scope_shard.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), + ShardScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY); - scope_app.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), + AppScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY); } @@ -146,11 +146,11 @@ public TaskResult call() { } if (!records.isEmpty()) { - scope_shard.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); + ShardScope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); } recordProcessorCheckpointer.largestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber( - scope_shard, records, recordProcessorCheckpointer.lastCheckpointValue(), + ShardScope, records, recordProcessorCheckpointer.lastCheckpointValue(), recordProcessorCheckpointer.largestPermittedCheckpointValue())); if (shouldCallProcessRecords(records)) { @@ -169,9 +169,9 @@ public TaskResult call() { } return new TaskResult(exception); } finally { - MetricsUtil.addSuccessAndLatency(scope_shard, success, startTimeMillis, MetricsLevel.SUMMARY); - MetricsUtil.endScope(scope_shard); - MetricsUtil.endScope(scope_app); + MetricsUtil.addSuccessAndLatency(ShardScope, success, startTimeMillis, MetricsLevel.SUMMARY); + MetricsUtil.endScope(ShardScope); + MetricsUtil.endScope(AppScope); } } From a2e0269826b4833410dcf278b8049e4fe1fd4f3f Mon Sep 17 00:00:00 2001 From: Qilin Jin Date: Wed, 1 Dec 2021 11:58:19 -0800 Subject: [PATCH 3/4] add some comments to explain the metric scopes --- .../amazon/kinesis/lifecycle/ProcessTask.java | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index f01acd895..cd460e3e4 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -113,23 +113,28 @@ public ProcessTask(@NonNull ShardInfo shardInfo, */ @Override public TaskResult call() { - final MetricsScope AppScope = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_LEVEL_METRICS); - final MetricsScope ShardScope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); + /** + * NOTE: the difference between appScope and shardScope is, appScope doesn't have shardId as a dimension, + * therefore all data added to appScope, although from different shard consumer, will be sent to the same metric, + * which is the app-level MillsBehindLatest metric. + */ + final MetricsScope appScope = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_LEVEL_METRICS); + final MetricsScope shardScope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); shardInfo.streamIdentifierSerOpt() - .ifPresent(streamId -> MetricsUtil.addStreamId(ShardScope, StreamIdentifier.multiStreamInstance(streamId))); - MetricsUtil.addShardId(ShardScope, shardInfo.shardId()); + .ifPresent(streamId -> MetricsUtil.addStreamId(shardScope, StreamIdentifier.multiStreamInstance(streamId))); + MetricsUtil.addShardId(shardScope, shardInfo.shardId()); long startTimeMillis = System.currentTimeMillis(); boolean success = false; try { - ShardScope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY); - ShardScope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY); + shardScope.addData(RECORDS_PROCESSED_METRIC, 0, StandardUnit.COUNT, MetricsLevel.SUMMARY); + shardScope.addData(DATA_BYTES_PROCESSED_METRIC, 0, StandardUnit.BYTES, MetricsLevel.SUMMARY); Exception exception = null; try { if (processRecordsInput.millisBehindLatest() != null) { - ShardScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), + shardScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY); - AppScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), + appScope.addData(MILLIS_BEHIND_LATEST_METRIC, processRecordsInput.millisBehindLatest(), StandardUnit.MILLISECONDS, MetricsLevel.SUMMARY); } @@ -146,11 +151,11 @@ public TaskResult call() { } if (!records.isEmpty()) { - ShardScope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); + shardScope.addData(RECORDS_PROCESSED_METRIC, records.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); } recordProcessorCheckpointer.largestPermittedCheckpointValue(filterAndGetMaxExtendedSequenceNumber( - ShardScope, records, recordProcessorCheckpointer.lastCheckpointValue(), + shardScope, records, recordProcessorCheckpointer.lastCheckpointValue(), recordProcessorCheckpointer.largestPermittedCheckpointValue())); if (shouldCallProcessRecords(records)) { @@ -169,9 +174,9 @@ public TaskResult call() { } return new TaskResult(exception); } finally { - MetricsUtil.addSuccessAndLatency(ShardScope, success, startTimeMillis, MetricsLevel.SUMMARY); - MetricsUtil.endScope(ShardScope); - MetricsUtil.endScope(AppScope); + MetricsUtil.addSuccessAndLatency(shardScope, success, startTimeMillis, MetricsLevel.SUMMARY); + MetricsUtil.endScope(shardScope); + MetricsUtil.endScope(appScope); } } From e22079123cce75c5840a1e33ecfae85ae48f1526 Mon Sep 17 00:00:00 2001 From: Qilin Jin Date: Thu, 2 Dec 2021 10:52:21 -0800 Subject: [PATCH 4/4] change the operation name for the metric --- .../java/software/amazon/kinesis/lifecycle/ProcessTask.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index cd460e3e4..f05efb91f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -45,7 +45,7 @@ @KinesisClientInternalApi public class ProcessTask implements ConsumerTask { private static final String PROCESS_TASK_OPERATION = "ProcessTask"; - private static final String APPLICATION_LEVEL_METRICS = "ApplicationLevelMetrics"; + private static final String APPLICATION_TRACKER_OPERATION = "ApplicationTracker"; private static final String DATA_BYTES_PROCESSED_METRIC = "DataBytesProcessed"; private static final String RECORDS_PROCESSED_METRIC = "RecordsProcessed"; private static final String RECORD_PROCESSOR_PROCESS_RECORDS_METRIC = "RecordProcessor.processRecords"; @@ -118,7 +118,7 @@ public TaskResult call() { * therefore all data added to appScope, although from different shard consumer, will be sent to the same metric, * which is the app-level MillsBehindLatest metric. */ - final MetricsScope appScope = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_LEVEL_METRICS); + final MetricsScope appScope = MetricsUtil.createMetricsWithOperation(metricsFactory, APPLICATION_TRACKER_OPERATION); final MetricsScope shardScope = MetricsUtil.createMetricsWithOperation(metricsFactory, PROCESS_TASK_OPERATION); shardInfo.streamIdentifierSerOpt() .ifPresent(streamId -> MetricsUtil.addStreamId(shardScope, StreamIdentifier.multiStreamInstance(streamId)));