Skip to content

Commit

Permalink
Fixing issue with NullMetrics warning messages (#284)
Browse files Browse the repository at this point in the history
Fixes #48 

* Fixing issue with NullMetrics warning messages when trying to checkpoint on a separate thread.

* Adding testing to validate the MetricsScope setting during checkpoiniting.
  • Loading branch information
sahilpalvia authored and pfifer committed Jan 22, 2018
1 parent 71124e4 commit e65e563
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;

import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down Expand Up @@ -50,6 +53,8 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
private SequenceNumberValidator sequenceNumberValidator;

private ExtendedSequenceNumber sequenceNumberAtShardEnd;

private IMetricsFactory metricsFactory;

/**
* Only has package level access, since only the Amazon Kinesis Client Library should be creating these.
Expand All @@ -59,10 +64,12 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
*/
RecordProcessorCheckpointer(ShardInfo shardInfo,
ICheckpoint checkpoint,
SequenceNumberValidator validator) {
SequenceNumberValidator validator,
IMetricsFactory metricsFactory) {
this.shardInfo = shardInfo;
this.checkpoint = checkpoint;
this.sequenceNumberValidator = validator;
this.metricsFactory = metricsFactory;
}

/**
Expand Down Expand Up @@ -283,21 +290,33 @@ void advancePosition(ExtendedSequenceNumber extendedSequenceNumber)
// just checkpoint at SHARD_END
checkpointToRecord = ExtendedSequenceNumber.SHARD_END;
}

boolean unsetMetrics = false;
// Don't checkpoint a value we already successfully checkpointed
if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting " + shardInfo.getShardId() + ", token " + shardInfo.getConcurrencyToken()
+ " checkpoint to " + checkpointToRecord);
try {
if (!MetricsHelper.isMetricsScopePresent()) {
MetricsHelper.setMetricsScope(new ThreadSafeMetricsDelegatingScope(metricsFactory.createMetrics()));
unsetMetrics = true;
}
if (extendedSequenceNumber != null && !extendedSequenceNumber.equals(lastCheckpointValue)) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting " + shardInfo.getShardId() + ", token " + shardInfo.getConcurrencyToken()
+ " checkpoint to " + checkpointToRecord);
}
checkpoint.setCheckpoint(shardInfo.getShardId(), checkpointToRecord, shardInfo.getConcurrencyToken());
lastCheckpointValue = checkpointToRecord;
} catch (ThrottlingException | ShutdownException | InvalidStateException
| KinesisClientLibDependencyException e) {
throw e;
} catch (KinesisClientLibException e) {
LOG.warn("Caught exception setting checkpoint.", e);
throw new KinesisClientLibDependencyException("Caught exception while checkpointing", e);
}
checkpoint.setCheckpoint(shardInfo.getShardId(), checkpointToRecord, shardInfo.getConcurrencyToken());
lastCheckpointValue = checkpointToRecord;
} catch (ThrottlingException | ShutdownException | InvalidStateException
| KinesisClientLibDependencyException e) {
throw e;
} catch (KinesisClientLibException e) {
LOG.warn("Caught exception setting checkpoint.", e);
throw new KinesisClientLibDependencyException("Caught exception while checkpointing", e);
}
} finally {
if (unsetMetrics) {
MetricsHelper.unsetMetricsScope();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher
new SequenceNumberValidator(
streamConfig.getStreamProxy(),
shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing())),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()),
metricsFactory),
leaseManager,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,22 @@ public static IMetricsScope startScope(IMetricsFactory factory, String operation
* @param scope
*/
public static void setMetricsScope(IMetricsScope scope) {
if (currentScope.get() != null) {
if (isMetricsScopePresent()) {
throw new RuntimeException(String.format(
"Metrics scope is already set for the current thread %s", Thread.currentThread().getName()));
}
currentScope.set(scope);
}

/**
* Checks if current metricsscope is present or not.
*
* @return true if metrics scope is present, else returns false
*/
public static boolean isMetricsScopePresent() {
return currentScope.get() != null;
}

/**
* Unsets the metrics scope for the current thread.
*/
Expand Down
Loading

0 comments on commit e65e563

Please sign in to comment.