diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java index 1945fffc7d3e1..cbf21698173c9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java @@ -97,86 +97,6 @@ Sensor addRateTotalSensor(final String scopeName, final Sensor.RecordingLevel recordingLevel, final String... tags); - /** - * Add a latency and throughput sensor for a specific operation, which will include the following sensors: - *
    - *
  1. average latency
  2. - *
  3. max latency
  4. - *
  5. throughput (num.operations / time unit)
  6. - *
- * Also create a parent sensor with the same metrics that aggregates all entities with the same operation under the - * same scope if it has not been created. - * - * @param scopeName name of the scope, could be the type of the state store, etc. - * @param entityName name of the entity, could be the name of the state store instance, etc. - * @param operationName name of the operation, could be get / put / delete / etc. - * @param recordingLevel the recording level (e.g., INFO or DEBUG) for this sensor. - * @param tags additional tags of the sensor - * @return The added sensor. - * @deprecated since 2.5. Use {@link #addLatencyRateTotalSensor(String, String, String, Sensor.RecordingLevel, String...) addLatencyRateTotalSensor()} - * instead. - */ - @Deprecated - Sensor addLatencyAndThroughputSensor(final String scopeName, - final String entityName, - final String operationName, - final Sensor.RecordingLevel recordingLevel, - final String... tags); - - /** - * Record the given latency value of the sensor. - * If the passed sensor includes throughput metrics, e.g., when created by the - * {@link #addLatencyAndThroughputSensor(String, String, String, Sensor.RecordingLevel, String...)} method, then the - * throughput metrics will also be recorded from this event. - * - * @param sensor sensor whose latency we are recording. - * @param startNs start of measurement time in nanoseconds. - * @param endNs end of measurement time in nanoseconds. - * @deprecated since 2.5. Use {@link Sensor#record(double) Sensor#record()} instead. - */ - @Deprecated - void recordLatency(final Sensor sensor, - final long startNs, - final long endNs); - - /** - * Add a throughput sensor for a specific operation: - *
    - *
  1. throughput (num.operations / time unit)
  2. - *
- * Also create a parent sensor with the same metrics that aggregates all entities with the same operation under the - * same scope if it has not been created. - * This sensor is a strict subset of the sensors created by - * {@link #addLatencyAndThroughputSensor(String, String, String, Sensor.RecordingLevel, String...)}. - * - * @param scopeName name of the scope, could be the type of the state store, etc. - * @param entityName name of the entity, could be the name of the state store instance, etc. - * @param operationName name of the operation, could be get / put / delete / etc. - * @param recordingLevel the recording level (e.g., INFO or DEBUG) for this sensor. - * @param tags additional tags of the sensor - * @return The added sensor. - * @deprecated since 2.5. Use {@link #addRateTotalSensor(String, String, String, Sensor.RecordingLevel, String...) - * addRateTotalSensor()} instead. - */ - @Deprecated - Sensor addThroughputSensor(final String scopeName, - final String entityName, - final String operationName, - final Sensor.RecordingLevel recordingLevel, - final String... tags); - - /** - * Record the throughput value of a sensor. - * - * @param sensor add Sensor whose throughput we are recording - * @param value throughput value - * @deprecated since 2.5. Use {@link Sensor#record() Sensor#record()} instead. - */ - @Deprecated - void recordThroughput(final Sensor sensor, - final long value); - - /** * Generic method to create a sensor. * Note that for most cases it is advisable to use diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java index e4e370888a08b..f251c2a65cce2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java @@ -497,18 +497,6 @@ public Sensor addSensor(final String name, final Sensor.RecordingLevel recording return Collections.unmodifiableMap(this.metrics.metrics()); } - @Override - @Deprecated - public void recordLatency(final Sensor sensor, final long startNs, final long endNs) { - sensor.record(endNs - startNs); - } - - @Override - @Deprecated - public void recordThroughput(final Sensor sensor, final long value) { - sensor.record(value); - } - private Map customizedTags(final String threadId, final String scopeName, final String entityName, @@ -585,79 +573,6 @@ public Sensor addRateTotalSensor(final String scopeName, ); } - /** - * @throws IllegalArgumentException if tags is not constructed in key-value pairs - */ - @Deprecated - @Override - public Sensor addLatencyAndThroughputSensor(final String scopeName, - final String entityName, - final String operationName, - final Sensor.RecordingLevel recordingLevel, - final String... tags) { - final String group = groupNameFromScope(scopeName); - - final String threadId = Thread.currentThread().getName(); - final Map tagMap = customizedTags(threadId, scopeName, entityName, tags); - final Map allTagMap = customizedTags(threadId, scopeName, "all", tags); - - // first add the global operation metrics if not yet, with the global tags only - final Sensor parent = metrics.sensor(externalParentSensorName(threadId, operationName), recordingLevel); - addAvgAndMaxLatencyToSensor(parent, group, allTagMap, operationName); - addInvocationRateAndCountToSensor(parent, group, allTagMap, operationName); - - // add the operation metrics with additional tags - final Sensor sensor = metrics.sensor( - externalChildSensorName(threadId, operationName, entityName), - recordingLevel, - parent - ); - addAvgAndMaxLatencyToSensor(sensor, group, tagMap, operationName); - addInvocationRateAndCountToSensor(sensor, group, tagMap, operationName); - - parentSensors.put(sensor, parent); - - return sensor; - - } - - /** - * @throws IllegalArgumentException if tags is not constructed in key-value pairs - */ - @Deprecated - @Override - public Sensor addThroughputSensor(final String scopeName, - final String entityName, - final String operationName, - final Sensor.RecordingLevel recordingLevel, - final String... tags) { - final String group = groupNameFromScope(scopeName); - - final String threadId = Thread.currentThread().getName(); - final Map tagMap = customizedTags(threadId, scopeName, entityName, tags); - final Map allTagMap = customizedTags(threadId, scopeName, "all", tags); - - // first add the global operation metrics if not yet, with the global tags only - final Sensor parent = metrics.sensor( - externalParentSensorName(threadId, operationName), - recordingLevel - ); - addInvocationRateAndCountToSensor(parent, group, allTagMap, operationName); - - // add the operation metrics with additional tags - final Sensor sensor = metrics.sensor( - externalChildSensorName(threadId, operationName, entityName), - recordingLevel, - parent - ); - addInvocationRateAndCountToSensor(sensor, group, tagMap, operationName); - - parentSensors.put(sensor, parent); - - return sensor; - - } - private String externalChildSensorName(final String threadId, final String operationName, final String entityName) { return SENSOR_EXTERNAL_LABEL + SENSOR_PREFIX_DELIMITER + threadId + SENSOR_PREFIX_DELIMITER + SENSOR_ENTITY_LABEL + SENSOR_PREFIX_DELIMITER + entityName diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java index 214696b8025d3..fdb96db6868e7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.java @@ -769,7 +769,6 @@ public void testMultiLevelSensorRemoval() { } @Test - @SuppressWarnings("deprecation") public void testLatencyMetrics() { final int defaultMetrics = streamsMetrics.metrics().size(); @@ -777,19 +776,18 @@ public void testLatencyMetrics() { final String entity = "entity"; final String operation = "put"; - final Sensor sensor1 = streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation, RecordingLevel.DEBUG); + final Sensor sensor1 = streamsMetrics.addLatencyRateTotalSensor(scope, entity, operation, RecordingLevel.DEBUG); - // 2 meters and 4 non-meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor final int meterMetricsCount = 2; // Each Meter is a combination of a Rate and a Total - final int otherMetricsCount = 4; - assertEquals(defaultMetrics + meterMetricsCount * 2 + otherMetricsCount, streamsMetrics.metrics().size()); + final int otherMetricsCount = 2; // Latency-max and Latency-avg + // 2 meters and 2 non-meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor + assertEquals(defaultMetrics + meterMetricsCount + otherMetricsCount, streamsMetrics.metrics().size()); streamsMetrics.removeSensor(sensor1); assertEquals(defaultMetrics, streamsMetrics.metrics().size()); } @Test - @SuppressWarnings("deprecation") public void testThroughputMetrics() { final int defaultMetrics = streamsMetrics.metrics().size(); @@ -797,11 +795,11 @@ public void testThroughputMetrics() { final String entity = "entity"; final String operation = "put"; - final Sensor sensor1 = streamsMetrics.addThroughputSensor(scope, entity, operation, RecordingLevel.DEBUG); + final Sensor sensor1 = streamsMetrics.addRateTotalSensor(scope, entity, operation, RecordingLevel.DEBUG); final int meterMetricsCount = 2; // Each Meter is a combination of a Rate and a Total // 2 meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor - assertEquals(defaultMetrics + meterMetricsCount * 2, streamsMetrics.metrics().size()); + assertEquals(defaultMetrics + meterMetricsCount, streamsMetrics.metrics().size()); streamsMetrics.removeSensor(sensor1); assertEquals(defaultMetrics, streamsMetrics.metrics().size());