Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 0 additions & 80 deletions streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,86 +97,6 @@ Sensor addRateTotalSensor(final String scopeName,
final Sensor.RecordingLevel recordingLevel,
final String... tags);

/**
* Add a latency and throughput sensor for a specific operation, which will include the following sensors:
* <ol>
* <li>average latency</li>
* <li>max latency</li>
* <li>throughput (num.operations / time unit)</li>
* </ol>
* Also create a parent sensor with the same metrics that aggregates all entities with the same operation under the
* same scope if it has not been created.
*
* @param scopeName name of the scope, could be the type of the state store, etc.
* @param entityName name of the entity, could be the name of the state store instance, etc.
* @param operationName name of the operation, could be get / put / delete / etc.
* @param recordingLevel the recording level (e.g., INFO or DEBUG) for this sensor.
* @param tags additional tags of the sensor
* @return The added sensor.
* @deprecated since 2.5. Use {@link #addLatencyRateTotalSensor(String, String, String, Sensor.RecordingLevel, String...) addLatencyRateTotalSensor()}
* instead.
*/
@Deprecated
Sensor addLatencyAndThroughputSensor(final String scopeName,
final String entityName,
final String operationName,
final Sensor.RecordingLevel recordingLevel,
final String... tags);

/**
* Record the given latency value of the sensor.
* If the passed sensor includes throughput metrics, e.g., when created by the
* {@link #addLatencyAndThroughputSensor(String, String, String, Sensor.RecordingLevel, String...)} method, then the
* throughput metrics will also be recorded from this event.
*
* @param sensor sensor whose latency we are recording.
* @param startNs start of measurement time in nanoseconds.
* @param endNs end of measurement time in nanoseconds.
* @deprecated since 2.5. Use {@link Sensor#record(double) Sensor#record()} instead.
*/
@Deprecated
void recordLatency(final Sensor sensor,
final long startNs,
final long endNs);

/**
* Add a throughput sensor for a specific operation:
* <ol>
* <li>throughput (num.operations / time unit)</li>
* </ol>
* Also create a parent sensor with the same metrics that aggregates all entities with the same operation under the
* same scope if it has not been created.
* This sensor is a strict subset of the sensors created by
* {@link #addLatencyAndThroughputSensor(String, String, String, Sensor.RecordingLevel, String...)}.
*
* @param scopeName name of the scope, could be the type of the state store, etc.
* @param entityName name of the entity, could be the name of the state store instance, etc.
* @param operationName name of the operation, could be get / put / delete / etc.
* @param recordingLevel the recording level (e.g., INFO or DEBUG) for this sensor.
* @param tags additional tags of the sensor
* @return The added sensor.
* @deprecated since 2.5. Use {@link #addRateTotalSensor(String, String, String, Sensor.RecordingLevel, String...)
* addRateTotalSensor()} instead.
*/
@Deprecated
Sensor addThroughputSensor(final String scopeName,
final String entityName,
final String operationName,
final Sensor.RecordingLevel recordingLevel,
final String... tags);

/**
* Record the throughput value of a sensor.
*
* @param sensor add Sensor whose throughput we are recording
* @param value throughput value
* @deprecated since 2.5. Use {@link Sensor#record() Sensor#record()} instead.
*/
@Deprecated
void recordThroughput(final Sensor sensor,
final long value);


/**
* Generic method to create a sensor.
* Note that for most cases it is advisable to use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,18 +497,6 @@ public Sensor addSensor(final String name, final Sensor.RecordingLevel recording
return Collections.unmodifiableMap(this.metrics.metrics());
}

@Override
@Deprecated
public void recordLatency(final Sensor sensor, final long startNs, final long endNs) {
sensor.record(endNs - startNs);
}

@Override
@Deprecated
public void recordThroughput(final Sensor sensor, final long value) {
sensor.record(value);
}

private Map<String, String> customizedTags(final String threadId,
final String scopeName,
final String entityName,
Expand Down Expand Up @@ -585,79 +573,6 @@ public Sensor addRateTotalSensor(final String scopeName,
);
}

/**
* @throws IllegalArgumentException if tags is not constructed in key-value pairs
*/
@Deprecated
@Override
public Sensor addLatencyAndThroughputSensor(final String scopeName,
final String entityName,
final String operationName,
final Sensor.RecordingLevel recordingLevel,
final String... tags) {
final String group = groupNameFromScope(scopeName);

final String threadId = Thread.currentThread().getName();
final Map<String, String> tagMap = customizedTags(threadId, scopeName, entityName, tags);
final Map<String, String> allTagMap = customizedTags(threadId, scopeName, "all", tags);

// first add the global operation metrics if not yet, with the global tags only
final Sensor parent = metrics.sensor(externalParentSensorName(threadId, operationName), recordingLevel);
addAvgAndMaxLatencyToSensor(parent, group, allTagMap, operationName);
addInvocationRateAndCountToSensor(parent, group, allTagMap, operationName);

// add the operation metrics with additional tags
final Sensor sensor = metrics.sensor(
externalChildSensorName(threadId, operationName, entityName),
recordingLevel,
parent
);
addAvgAndMaxLatencyToSensor(sensor, group, tagMap, operationName);
addInvocationRateAndCountToSensor(sensor, group, tagMap, operationName);

parentSensors.put(sensor, parent);

return sensor;

}

/**
* @throws IllegalArgumentException if tags is not constructed in key-value pairs
*/
@Deprecated
@Override
public Sensor addThroughputSensor(final String scopeName,
final String entityName,
final String operationName,
final Sensor.RecordingLevel recordingLevel,
final String... tags) {
final String group = groupNameFromScope(scopeName);

final String threadId = Thread.currentThread().getName();
final Map<String, String> tagMap = customizedTags(threadId, scopeName, entityName, tags);
final Map<String, String> allTagMap = customizedTags(threadId, scopeName, "all", tags);

// first add the global operation metrics if not yet, with the global tags only
final Sensor parent = metrics.sensor(
externalParentSensorName(threadId, operationName),
recordingLevel
);
addInvocationRateAndCountToSensor(parent, group, allTagMap, operationName);

// add the operation metrics with additional tags
final Sensor sensor = metrics.sensor(
externalChildSensorName(threadId, operationName, entityName),
recordingLevel,
parent
);
addInvocationRateAndCountToSensor(sensor, group, tagMap, operationName);

parentSensors.put(sensor, parent);

return sensor;

}

private String externalChildSensorName(final String threadId, final String operationName, final String entityName) {
return SENSOR_EXTERNAL_LABEL + SENSOR_PREFIX_DELIMITER + threadId
+ SENSOR_PREFIX_DELIMITER + SENSOR_ENTITY_LABEL + SENSOR_PREFIX_DELIMITER + entityName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -769,39 +769,37 @@ public void testMultiLevelSensorRemoval() {
}

@Test
@SuppressWarnings("deprecation")
public void testLatencyMetrics() {
final int defaultMetrics = streamsMetrics.metrics().size();

final String scope = "scope";
final String entity = "entity";
final String operation = "put";

final Sensor sensor1 = streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation, RecordingLevel.DEBUG);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also remove the @SuppressWarnings("deprecation") at the top of the method.

final Sensor sensor1 = streamsMetrics.addLatencyRateTotalSensor(scope, entity, operation, RecordingLevel.DEBUG);

// 2 meters and 4 non-meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
final int meterMetricsCount = 2; // Each Meter is a combination of a Rate and a Total
final int otherMetricsCount = 4;
assertEquals(defaultMetrics + meterMetricsCount * 2 + otherMetricsCount, streamsMetrics.metrics().size());
final int otherMetricsCount = 2; // Latency-max and Latency-avg
// 2 meters and 2 non-meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
assertEquals(defaultMetrics + meterMetricsCount + otherMetricsCount, streamsMetrics.metrics().size());

streamsMetrics.removeSensor(sensor1);
assertEquals(defaultMetrics, streamsMetrics.metrics().size());
}

@Test
@SuppressWarnings("deprecation")
public void testThroughputMetrics() {
final int defaultMetrics = streamsMetrics.metrics().size();

final String scope = "scope";
final String entity = "entity";
final String operation = "put";

final Sensor sensor1 = streamsMetrics.addThroughputSensor(scope, entity, operation, RecordingLevel.DEBUG);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also remove the @SuppressWarnings("deprecation") at the top of the method.

final Sensor sensor1 = streamsMetrics.addRateTotalSensor(scope, entity, operation, RecordingLevel.DEBUG);

final int meterMetricsCount = 2; // Each Meter is a combination of a Rate and a Total
// 2 meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
assertEquals(defaultMetrics + meterMetricsCount * 2, streamsMetrics.metrics().size());
assertEquals(defaultMetrics + meterMetricsCount, streamsMetrics.metrics().size());

streamsMetrics.removeSensor(sensor1);
assertEquals(defaultMetrics, streamsMetrics.metrics().size());
Expand Down