diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index aafbfcff6..82514b876 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -163,16 +163,19 @@ public synchronized boolean checkAndCreateLeaseForNewShards(@NonNull final Shard final Set createdLeases = new HashSet<>(); for (Lease lease : newLeasesToCreate) { - long startTime = System.currentTimeMillis(); + final long startTime = System.currentTimeMillis(); boolean success = false; try { if(leaseRefresher.createLeaseIfNotExists(lease)) { createdLeases.add(lease); } success = true; - } - finally { + } finally { MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED); + if (lease.checkpoint() != null) { + final String metricName = lease.checkpoint().isSentinelCheckpoint() ? lease.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER"; + MetricsUtil.addSuccess(scope, "CreateLease_" + metricName, true, MetricsLevel.DETAILED); + } } } log.info("{} - Newly created leases {}: {}", streamIdentifier, createdLeases.size(), createdLeases); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index c436f38a8..0322c0e2f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -181,7 +181,7 @@ private void takeShardEndAction(Lease currentShardLease, + " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner."); } if (!CollectionUtils.isNullOrEmpty(childShards)) { - createLeasesForChildShardsIfNotExist(); + createLeasesForChildShardsIfNotExist(scope); updateLeaseWithChildShards(currentShardLease); } final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, @@ -239,7 +239,7 @@ private void throwOnApplicationException(Runnable action, MetricsScope metricsSc } } - private void createLeasesForChildShardsIfNotExist() + private void createLeasesForChildShardsIfNotExist(MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException { // For child shard resulted from merge of two parent shards, verify if both the parents are either present or // not present in the lease table before creating the lease entry. @@ -272,7 +272,18 @@ private void createLeasesForChildShardsIfNotExist() if(leaseCoordinator.leaseRefresher().getLease(leaseKey) == null) { log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey); final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier()); - leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate); + final long startTime = System.currentTimeMillis(); + boolean success = false; + try { + leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate); + success = true; + } finally { + MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED); + if (leaseToCreate.checkpoint() != null) { + final String metricName = leaseToCreate.checkpoint().isSentinelCheckpoint() ? leaseToCreate.checkpoint().sequenceNumber() : "SEQUENCE_NUMBER"; + MetricsUtil.addSuccess(scope, "CreateLease_" + metricName, true, MetricsLevel.DETAILED); + } + } log.info("{} - Shard {}: Created child shard lease: {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseToCreate); }