From 3f592b171acc0f538efdeda4d99e84444af0b30d Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Tue, 11 Jan 2022 21:50:44 -0500 Subject: [PATCH 1/5] InstrumentedStreams for input & output streams Track bytes read/written via meter and throughput histogram --- .../tritium/io/ForwardingInputStream.java | 92 +++++++++++++++++++ .../tritium/io/ForwardingOutputStream.java | 81 ++++++++++++++++ .../tritium/io/InstrumentedInputStream.java | 52 +++++++++++ .../tritium/io/InstrumentedOutputStream.java | 49 ++++++++++ .../tritium/io/InstrumentedStreams.java | 52 +++++++++++ .../tritium/io/InstrumentedStreamsTest.java | 86 +++++++++++++++++ 6 files changed, 412 insertions(+) create mode 100644 tritium-lib/src/main/java/com/palantir/tritium/io/ForwardingInputStream.java create mode 100644 tritium-lib/src/main/java/com/palantir/tritium/io/ForwardingOutputStream.java create mode 100644 tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedInputStream.java create mode 100644 tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedOutputStream.java create mode 100644 tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedStreams.java create mode 100644 tritium-lib/src/test/java/com/palantir/tritium/io/InstrumentedStreamsTest.java diff --git a/tritium-lib/src/main/java/com/palantir/tritium/io/ForwardingInputStream.java b/tritium-lib/src/main/java/com/palantir/tritium/io/ForwardingInputStream.java new file mode 100644 index 000000000..27d726302 --- /dev/null +++ b/tritium-lib/src/main/java/com/palantir/tritium/io/ForwardingInputStream.java @@ -0,0 +1,92 @@ +/* + * (c) Copyright 2022 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.tritium.io; + +import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.logger.SafeLogger; +import com.palantir.logsafe.logger.SafeLoggerFactory; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; + +abstract class ForwardingInputStream extends FilterInputStream { + private static final SafeLogger log = SafeLoggerFactory.get(ForwardingInputStream.class); + + protected ForwardingInputStream(InputStream in) { + super(in); + } + + protected final InputStream input() { + return super.in; + } + + /** + * Hook to handle number of bytes that will attempt to read. + * Default implementation logs the number of bytes that will attempt to read. + * @param bytesToRead number of bytes that will attempt to read + */ + protected void before(long bytesToRead) { + if (log.isTraceEnabled()) { + log.trace("Attempting to read", SafeArg.of("bytesToRead", bytesToRead)); + } + } + + /** + * Hook to handle number of bytes that were actually read, or -1 if end-of-stream. + * Default implementation logs the number of bytes read. + * @param bytesRead number of bytes that were read, or -1 if end-of-stream + */ + protected void after(long bytesRead) { + if (log.isDebugEnabled()) { + log.debug("Read", SafeArg.of("bytesRead", bytesRead)); + } + } + + @Override + public int read(byte[] bytes, int off, int len) throws IOException { + Objects.checkFromIndexSize(off, len, bytes.length); + before(len); + int bytesRead = input().read(bytes, off, len); + after(bytesRead); + return bytesRead; + } + + @Override + public int read(byte[] bytes) throws IOException { + before(bytes.length); + int bytesRead = input().read(bytes); + after(bytesRead); + return bytesRead; + } + + @Override + public int read() throws IOException { + before(1); + int bytesRead = input().read(); + after(bytesRead); + return bytesRead; + } + + @Override + public int readNBytes(byte[] bytes, int off, int len) throws IOException { + before(len); + int bytesRead = input().readNBytes(bytes, off, len); + after(bytesRead); + return bytesRead; + } +} diff --git a/tritium-lib/src/main/java/com/palantir/tritium/io/ForwardingOutputStream.java b/tritium-lib/src/main/java/com/palantir/tritium/io/ForwardingOutputStream.java new file mode 100644 index 000000000..68ebf6491 --- /dev/null +++ b/tritium-lib/src/main/java/com/palantir/tritium/io/ForwardingOutputStream.java @@ -0,0 +1,81 @@ +/* + * (c) Copyright 2022 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.tritium.io; + +import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.logger.SafeLogger; +import com.palantir.logsafe.logger.SafeLoggerFactory; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Objects; + +abstract class ForwardingOutputStream extends FilterOutputStream { + private static final SafeLogger log = SafeLoggerFactory.get(ForwardingOutputStream.class); + + protected ForwardingOutputStream(OutputStream in) { + super(in); + } + + protected final OutputStream output() { + return super.out; + } + + /** + * Hook to handle number of bytes that will attempt to write. + * Default implementation logs the number of bytes that will attempt to write. + * @param bytesToWrite number of bytes that will attempt to write + */ + protected void before(long bytesToWrite) { + if (log.isTraceEnabled()) { + log.trace("Attempting to write", SafeArg.of("bytesToWrite", bytesToWrite)); + } + } + + /** + * Hook to handle number of bytes that were actually written. + * Default implementation logs the number of bytes written. + * @param bytesWritten number of bytes that were written + */ + protected void after(long bytesWritten) { + if (log.isDebugEnabled()) { + log.debug("Wrote", SafeArg.of("bytesWritten", bytesWritten)); + } + } + + @Override + public void write(byte[] bytes, int off, int len) throws IOException { + Objects.checkFromIndexSize(off, len, bytes.length); + before(len); + output().write(bytes, off, len); + after(len); + } + + @Override + public void write(byte[] bytes) throws IOException { + before(bytes.length); + output().write(bytes); + after(bytes.length); + } + + @Override + public void write(int value) throws IOException { + before(1); + output().write(value); + after(1); + } +} diff --git a/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedInputStream.java b/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedInputStream.java new file mode 100644 index 000000000..361838f96 --- /dev/null +++ b/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedInputStream.java @@ -0,0 +1,52 @@ +/* + * (c) Copyright 2022 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.tritium.io; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.palantir.logsafe.Preconditions; +import java.io.InputStream; + +final class InstrumentedInputStream extends ForwardingInputStream { + private final Meter bytes; + private final Histogram throughput; + private long start; + + InstrumentedInputStream(InputStream in, Meter bytes, Histogram throughput) { + super(in); + this.bytes = Preconditions.checkNotNull(bytes, "bytes"); + this.throughput = Preconditions.checkNotNull(throughput, "throughput"); + } + + @Override + protected void before(long bytesToRead) { + Preconditions.checkArgument(bytesToRead >= 0); + start = System.nanoTime(); + super.before(bytesToRead); + } + + @Override + protected void after(long bytesRead) { + if (bytesRead > -1) { + double elapsedSeconds = (System.nanoTime() - start) / 1_000_000_000.0; + long bytesPerSecond = Math.round(bytesRead / elapsedSeconds); + throughput.update(bytesPerSecond); + bytes.mark(bytesRead); + } + super.after(bytesRead); + } +} diff --git a/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedOutputStream.java b/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedOutputStream.java new file mode 100644 index 000000000..838bed9a7 --- /dev/null +++ b/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedOutputStream.java @@ -0,0 +1,49 @@ +/* + * (c) Copyright 2022 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.tritium.io; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.palantir.logsafe.Preconditions; +import java.io.OutputStream; + +final class InstrumentedOutputStream extends ForwardingOutputStream { + private final Meter bytes; + private final Histogram throughput; + private long start; + + InstrumentedOutputStream(OutputStream in, Meter bytes, Histogram throughput) { + super(in); + this.bytes = Preconditions.checkNotNull(bytes, "bytes"); + this.throughput = Preconditions.checkNotNull(throughput, "throughput"); + } + + @Override + protected void before(long bytesToWrite) { + start = System.nanoTime(); + super.before(bytesToWrite); + } + + @Override + protected void after(long bytesWritten) { + double elapsedSeconds = (System.nanoTime() - start) / 1_000_000_000.0; + long bytesPerSecond = Math.round(bytesWritten / elapsedSeconds); + throughput.update(bytesPerSecond); + this.bytes.mark(bytesWritten); + super.after(bytesWritten); + } +} diff --git a/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedStreams.java b/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedStreams.java new file mode 100644 index 000000000..2c344ff43 --- /dev/null +++ b/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedStreams.java @@ -0,0 +1,52 @@ +/* + * (c) Copyright 2022 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.tritium.io; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import java.io.InputStream; +import java.io.OutputStream; + +public final class InstrumentedStreams { + private InstrumentedStreams() {} + + /** + * Instruments the provided stream to provide a meter tracking bytes read, + * and histogram tracking bytes per second. + * + * @param in input + * @param bytes bytes read + * @param throughput bytes read per second + * @return instrumented input stream + */ + public static InputStream input(InputStream in, Meter bytes, Histogram throughput) { + return new InstrumentedInputStream(in, bytes, throughput); + } + + /** + * Instruments the provided stream to provide a meter tracking bytes written, + * and histogram tracking bytes per second. + * + * @param out output + * @param bytes bytes read + * @param throughput bytes read per second + * @return instrumented output stream + */ + public static OutputStream output(OutputStream out, Meter bytes, Histogram throughput) { + return new InstrumentedOutputStream(out, bytes, throughput); + } +} diff --git a/tritium-lib/src/test/java/com/palantir/tritium/io/InstrumentedStreamsTest.java b/tritium-lib/src/test/java/com/palantir/tritium/io/InstrumentedStreamsTest.java new file mode 100644 index 000000000..e28fdab98 --- /dev/null +++ b/tritium-lib/src/test/java/com/palantir/tritium/io/InstrumentedStreamsTest.java @@ -0,0 +1,86 @@ +/* + * (c) Copyright 2022 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.tritium.io; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.codahale.metrics.ConsoleReporter; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.google.common.io.ByteStreams; +import com.palantir.tritium.metrics.MetricRegistries; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.junit.jupiter.api.Test; + +class InstrumentedStreamsTest { + + @SuppressWarnings("SystemOut") + @Test + void copy() throws IOException { + byte[] bytes = new byte[100 * 1024 * 1024]; + MetricRegistry registry = MetricRegistries.createWithHdrHistogramReservoirs(); + + Meter bytesReadMeter = registry.meter("bytes-read"); + Histogram readThroughput = registry.histogram("bytes-read-per-second"); + + Meter bytesWrittenMeter = registry.meter("bytes-written"); + Histogram writeThroughput = registry.histogram("bytes-written-per-second"); + + int iterations = 10; + for (long i = 1; i <= iterations; i++) { + try (ConsoleReporter reporter = + ConsoleReporter.forRegistry(registry).build(); + InputStream input = new ByteArrayInputStream(bytes); + OutputStream output = new ByteArrayOutputStream(bytes.length); + InputStream instrumentedInputStream = + InstrumentedStreams.input(input, bytesReadMeter, readThroughput); + OutputStream instrumentedOutputStream = + InstrumentedStreams.output(output, bytesWrittenMeter, writeThroughput)) { + assertThat(ByteStreams.copy(instrumentedInputStream, instrumentedOutputStream)) + .isEqualTo(bytes.length); + assertThat(bytesReadMeter.getCount()).isNotZero().isEqualTo(i * bytes.length); + assertThat(bytesWrittenMeter.getCount()).isNotZero().isEqualTo(i * bytes.length); + reporter.report(); + } + } + + assertThat(bytesReadMeter.getCount()).isNotZero().isEqualTo(iterations * bytes.length); + assertThat(readThroughput.getCount()).isGreaterThanOrEqualTo(iterations); + assertThat(readThroughput.getSnapshot()).satisfies(snapshot -> { + assertThat(snapshot.getMin()).isNotZero(); + assertThat(snapshot.getMean()).isNotZero(); + assertThat(snapshot.getMedian()).isNotZero(); + assertThat(snapshot.getMax()).isNotZero(); + System.err.printf("Mean read throughput %.3g GiB/sec %n", snapshot.getMean() / (1024.0 * 1024.0 * 1024.0)); + }); + + assertThat(bytesWrittenMeter.getCount()).isNotZero().isEqualTo(iterations * bytes.length); + assertThat(writeThroughput.getCount()).isGreaterThanOrEqualTo(iterations); + assertThat(writeThroughput.getSnapshot()).satisfies(snapshot -> { + assertThat(snapshot.getMin()).isNotZero(); + assertThat(snapshot.getMean()).isNotZero(); + assertThat(snapshot.getMedian()).isNotZero(); + assertThat(snapshot.getMax()).isNotZero(); + System.err.printf("Mean write throughput %.3g GiB/sec %n", snapshot.getMean() / (1024.0 * 1024.0 * 1024.0)); + }); + } +} From b9ccdcb8639205aaedfce6a2301fa82b50a71524 Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Tue, 11 Jan 2022 22:17:50 -0500 Subject: [PATCH 2/5] Update InstrumentedStreamsTest.java --- .../java/com/palantir/tritium/io/InstrumentedStreamsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tritium-lib/src/test/java/com/palantir/tritium/io/InstrumentedStreamsTest.java b/tritium-lib/src/test/java/com/palantir/tritium/io/InstrumentedStreamsTest.java index e28fdab98..1884e834e 100644 --- a/tritium-lib/src/test/java/com/palantir/tritium/io/InstrumentedStreamsTest.java +++ b/tritium-lib/src/test/java/com/palantir/tritium/io/InstrumentedStreamsTest.java @@ -33,8 +33,8 @@ class InstrumentedStreamsTest { - @SuppressWarnings("SystemOut") @Test + @SuppressWarnings("SystemOut") void copy() throws IOException { byte[] bytes = new byte[100 * 1024 * 1024]; MetricRegistry registry = MetricRegistries.createWithHdrHistogramReservoirs(); From bf9ce7cf75800af2d550294ace4790f64755835d Mon Sep 17 00:00:00 2001 From: svc-changelog Date: Wed, 12 Jan 2022 03:18:04 +0000 Subject: [PATCH 3/5] Add generated changelog entries --- changelog/@unreleased/pr-1314.v2.yml | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 changelog/@unreleased/pr-1314.v2.yml diff --git a/changelog/@unreleased/pr-1314.v2.yml b/changelog/@unreleased/pr-1314.v2.yml new file mode 100644 index 000000000..0dfabdb50 --- /dev/null +++ b/changelog/@unreleased/pr-1314.v2.yml @@ -0,0 +1,8 @@ +type: improvement +improvement: + description: |- + InstrumentedStreams for input & output streams + + Track bytes read/written via meter and throughput histogram + links: + - https://github.com/palantir/tritium/pull/1314 From 883de340ea0a817be7a1b0a84a818ed580c6f6fe Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Thu, 13 Jan 2022 13:43:00 -0500 Subject: [PATCH 4/5] I/O metrics use tagged metric schema --- tritium-lib/build.gradle | 1 + .../tritium/io/InstrumentedInputStream.java | 16 +--- .../tritium/io/InstrumentedOutputStream.java | 15 +--- .../tritium/io/InstrumentedStreams.java | 28 +++---- tritium-lib/src/main/metrics/metrics.yml | 15 ++++ .../tritium/io/InstrumentedStreamsTest.java | 84 ++++++++++--------- 6 files changed, 78 insertions(+), 81 deletions(-) create mode 100644 tritium-lib/src/main/metrics/metrics.yml diff --git a/tritium-lib/build.gradle b/tritium-lib/build.gradle index f2e4115ef..0fd0ccad9 100644 --- a/tritium-lib/build.gradle +++ b/tritium-lib/build.gradle @@ -1,4 +1,5 @@ apply plugin: 'com.palantir.external-publish-jar' +apply plugin: 'com.palantir.metric-schema' apply plugin: 'com.palantir.revapi' dependencies { diff --git a/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedInputStream.java b/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedInputStream.java index 361838f96..75fc2df33 100644 --- a/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedInputStream.java +++ b/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedInputStream.java @@ -16,35 +16,21 @@ package com.palantir.tritium.io; -import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.palantir.logsafe.Preconditions; import java.io.InputStream; final class InstrumentedInputStream extends ForwardingInputStream { private final Meter bytes; - private final Histogram throughput; - private long start; - InstrumentedInputStream(InputStream in, Meter bytes, Histogram throughput) { + InstrumentedInputStream(InputStream in, Meter bytes) { super(in); this.bytes = Preconditions.checkNotNull(bytes, "bytes"); - this.throughput = Preconditions.checkNotNull(throughput, "throughput"); - } - - @Override - protected void before(long bytesToRead) { - Preconditions.checkArgument(bytesToRead >= 0); - start = System.nanoTime(); - super.before(bytesToRead); } @Override protected void after(long bytesRead) { if (bytesRead > -1) { - double elapsedSeconds = (System.nanoTime() - start) / 1_000_000_000.0; - long bytesPerSecond = Math.round(bytesRead / elapsedSeconds); - throughput.update(bytesPerSecond); bytes.mark(bytesRead); } super.after(bytesRead); diff --git a/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedOutputStream.java b/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedOutputStream.java index 838bed9a7..bc7032b5b 100644 --- a/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedOutputStream.java +++ b/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedOutputStream.java @@ -16,33 +16,20 @@ package com.palantir.tritium.io; -import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.palantir.logsafe.Preconditions; import java.io.OutputStream; final class InstrumentedOutputStream extends ForwardingOutputStream { private final Meter bytes; - private final Histogram throughput; - private long start; - InstrumentedOutputStream(OutputStream in, Meter bytes, Histogram throughput) { + InstrumentedOutputStream(OutputStream in, Meter bytes) { super(in); this.bytes = Preconditions.checkNotNull(bytes, "bytes"); - this.throughput = Preconditions.checkNotNull(throughput, "throughput"); - } - - @Override - protected void before(long bytesToWrite) { - start = System.nanoTime(); - super.before(bytesToWrite); } @Override protected void after(long bytesWritten) { - double elapsedSeconds = (System.nanoTime() - start) / 1_000_000_000.0; - long bytesPerSecond = Math.round(bytesWritten / elapsedSeconds); - throughput.update(bytesPerSecond); this.bytes.mark(bytesWritten); super.after(bytesWritten); } diff --git a/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedStreams.java b/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedStreams.java index 2c344ff43..c9175ecfc 100644 --- a/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedStreams.java +++ b/tritium-lib/src/main/java/com/palantir/tritium/io/InstrumentedStreams.java @@ -16,8 +16,9 @@ package com.palantir.tritium.io; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; +import com.google.errorprone.annotations.CompileTimeConstant; +import com.palantir.logsafe.Safe; +import com.palantir.tritium.metrics.registry.TaggedMetricRegistry; import java.io.InputStream; import java.io.OutputStream; @@ -25,28 +26,27 @@ public final class InstrumentedStreams { private InstrumentedStreams() {} /** - * Instruments the provided stream to provide a meter tracking bytes read, - * and histogram tracking bytes per second. + * Instruments the provided stream to provide a meter tracking bytes read. * * @param in input - * @param bytes bytes read - * @param throughput bytes read per second + * @param metrics metric registry + * @param type type of stream being instrumented, must be compile-time safe tag * @return instrumented input stream */ - public static InputStream input(InputStream in, Meter bytes, Histogram throughput) { - return new InstrumentedInputStream(in, bytes, throughput); + public static InputStream input( + InputStream in, TaggedMetricRegistry metrics, @Safe @CompileTimeConstant String type) { + return new InstrumentedInputStream(in, IoStreamMetrics.of(metrics).read(type)); } /** - * Instruments the provided stream to provide a meter tracking bytes written, - * and histogram tracking bytes per second. + * Instruments the provided stream to provide a meter tracking bytes written. * * @param out output - * @param bytes bytes read - * @param throughput bytes read per second + * @param type type of stream being instrumented, must be compile-time safe tag * @return instrumented output stream */ - public static OutputStream output(OutputStream out, Meter bytes, Histogram throughput) { - return new InstrumentedOutputStream(out, bytes, throughput); + public static OutputStream output( + OutputStream out, TaggedMetricRegistry metrics, @Safe @CompileTimeConstant String type) { + return new InstrumentedOutputStream(out, IoStreamMetrics.of(metrics).write(type)); } } diff --git a/tritium-lib/src/main/metrics/metrics.yml b/tritium-lib/src/main/metrics/metrics.yml new file mode 100644 index 000000000..243ab8dac --- /dev/null +++ b/tritium-lib/src/main/metrics/metrics.yml @@ -0,0 +1,15 @@ +options: + javaPackage: com.palantir.tritium.io + javaVisibility: packagePrivate +namespaces: + io.stream: + docs: Input/Output stream metrics. + metrics: + read: + type: meter + tags: [type] + docs: Measures the rate of bytes read from an InputStream for a specified type. + write: + type: meter + tags: [type] + docs: Measures the rate of bytes written to an OutputStream for a specified type. diff --git a/tritium-lib/src/test/java/com/palantir/tritium/io/InstrumentedStreamsTest.java b/tritium-lib/src/test/java/com/palantir/tritium/io/InstrumentedStreamsTest.java index 1884e834e..a73da5122 100644 --- a/tritium-lib/src/test/java/com/palantir/tritium/io/InstrumentedStreamsTest.java +++ b/tritium-lib/src/test/java/com/palantir/tritium/io/InstrumentedStreamsTest.java @@ -19,68 +19,76 @@ import static org.assertj.core.api.Assertions.assertThat; import com.codahale.metrics.ConsoleReporter; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; +import com.google.common.collect.Iterables; +import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; -import com.palantir.tritium.metrics.MetricRegistries; +import com.palantir.tritium.Tagged; +import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry; +import com.palantir.tritium.metrics.registry.TaggedMetricRegistry; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.concurrent.ThreadLocalRandom; +import java.util.zip.GZIPOutputStream; import org.junit.jupiter.api.Test; class InstrumentedStreamsTest { @Test - @SuppressWarnings("SystemOut") - void copy() throws IOException { + void instrumentedCopy() throws IOException { byte[] bytes = new byte[100 * 1024 * 1024]; - MetricRegistry registry = MetricRegistries.createWithHdrHistogramReservoirs(); - - Meter bytesReadMeter = registry.meter("bytes-read"); - Histogram readThroughput = registry.histogram("bytes-read-per-second"); - - Meter bytesWrittenMeter = registry.meter("bytes-written"); - Histogram writeThroughput = registry.histogram("bytes-written-per-second"); + TaggedMetricRegistry registry = new DefaultTaggedMetricRegistry(); + IoStreamMetrics metrics = IoStreamMetrics.of(registry); int iterations = 10; for (long i = 1; i <= iterations; i++) { try (ConsoleReporter reporter = - ConsoleReporter.forRegistry(registry).build(); + ConsoleReporter.forRegistry(new MetricRegistry()).build(); InputStream input = new ByteArrayInputStream(bytes); OutputStream output = new ByteArrayOutputStream(bytes.length); - InputStream instrumentedInputStream = - InstrumentedStreams.input(input, bytesReadMeter, readThroughput); - OutputStream instrumentedOutputStream = - InstrumentedStreams.output(output, bytesWrittenMeter, writeThroughput)) { + InputStream instrumentedInputStream = InstrumentedStreams.input(input, registry, "test-in"); + OutputStream instrumentedOutputStream = InstrumentedStreams.output(output, registry, "test-out")) { assertThat(ByteStreams.copy(instrumentedInputStream, instrumentedOutputStream)) .isEqualTo(bytes.length); - assertThat(bytesReadMeter.getCount()).isNotZero().isEqualTo(i * bytes.length); - assertThat(bytesWrittenMeter.getCount()).isNotZero().isEqualTo(i * bytes.length); - reporter.report(); + assertThat(metrics.read("test-in").getCount()).isNotZero().isEqualTo(i * bytes.length); + assertThat(metrics.write("test-out").getCount()).isNotZero().isEqualTo(i * bytes.length); + Tagged.report(reporter, registry); } } - assertThat(bytesReadMeter.getCount()).isNotZero().isEqualTo(iterations * bytes.length); - assertThat(readThroughput.getCount()).isGreaterThanOrEqualTo(iterations); - assertThat(readThroughput.getSnapshot()).satisfies(snapshot -> { - assertThat(snapshot.getMin()).isNotZero(); - assertThat(snapshot.getMean()).isNotZero(); - assertThat(snapshot.getMedian()).isNotZero(); - assertThat(snapshot.getMax()).isNotZero(); - System.err.printf("Mean read throughput %.3g GiB/sec %n", snapshot.getMean() / (1024.0 * 1024.0 * 1024.0)); - }); + assertThat(metrics.read("test-in").getCount()).isNotZero().isEqualTo(iterations * bytes.length); + assertThat(metrics.write("test-out").getCount()).isNotZero().isEqualTo(iterations * bytes.length); + } + + @Test + void instrumentedGzip() throws IOException { + byte[] bytes = new byte[1024]; + ThreadLocalRandom.current().nextBytes(bytes); + TaggedMetricRegistry registry = new DefaultTaggedMetricRegistry(); + IoStreamMetrics metrics = IoStreamMetrics.of(registry); + long totalSize = 1L << 31; + ByteSource byteSource = ByteSource.concat(Iterables.cycle(ByteSource.wrap(bytes))); - assertThat(bytesWrittenMeter.getCount()).isNotZero().isEqualTo(iterations * bytes.length); - assertThat(writeThroughput.getCount()).isGreaterThanOrEqualTo(iterations); - assertThat(writeThroughput.getSnapshot()).satisfies(snapshot -> { - assertThat(snapshot.getMin()).isNotZero(); - assertThat(snapshot.getMean()).isNotZero(); - assertThat(snapshot.getMedian()).isNotZero(); - assertThat(snapshot.getMax()).isNotZero(); - System.err.printf("Mean write throughput %.3g GiB/sec %n", snapshot.getMean() / (1024.0 * 1024.0 * 1024.0)); - }); + try (ConsoleReporter reporter = + ConsoleReporter.forRegistry(new MetricRegistry()).build(); + InputStream input = ByteStreams.limit(byteSource.openStream(), totalSize); + InputStream instrumentedInputStream = InstrumentedStreams.input(input, registry, "test-in"); + OutputStream output = ByteStreams.nullOutputStream(); + OutputStream instrumentedRawOutputStream = InstrumentedStreams.output(output, registry, "raw-out"); + OutputStream gzipOut = new GZIPOutputStream(instrumentedRawOutputStream); + OutputStream instrumentedGzipOutputStream = InstrumentedStreams.output(gzipOut, registry, "gzip-out")) { + assertThat(ByteStreams.copy(instrumentedInputStream, instrumentedGzipOutputStream)) + .isEqualTo(totalSize); + assertThat(metrics.read("test-in").getCount()).isNotZero().isEqualTo(totalSize); + assertThat(metrics.write("gzip-out").getCount()) + .isNotZero() + .isEqualTo(totalSize) + .isGreaterThan(metrics.write("raw-out").getCount()); + assertThat(metrics.write("raw-out").getCount()).isNotZero(); + Tagged.report(reporter, registry); + } } } From 633269559730ae07b8be08583ca1bd6ddf011957 Mon Sep 17 00:00:00 2001 From: David Schlosnagle Date: Thu, 13 Jan 2022 15:08:30 -0500 Subject: [PATCH 5/5] test copied bytes --- .../tritium/io/InstrumentedStreamsTest.java | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/tritium-lib/src/test/java/com/palantir/tritium/io/InstrumentedStreamsTest.java b/tritium-lib/src/test/java/com/palantir/tritium/io/InstrumentedStreamsTest.java index a73da5122..35a8629ff 100644 --- a/tritium-lib/src/test/java/com/palantir/tritium/io/InstrumentedStreamsTest.java +++ b/tritium-lib/src/test/java/com/palantir/tritium/io/InstrumentedStreamsTest.java @@ -48,19 +48,20 @@ void instrumentedCopy() throws IOException { try (ConsoleReporter reporter = ConsoleReporter.forRegistry(new MetricRegistry()).build(); InputStream input = new ByteArrayInputStream(bytes); - OutputStream output = new ByteArrayOutputStream(bytes.length); - InputStream instrumentedInputStream = InstrumentedStreams.input(input, registry, "test-in"); - OutputStream instrumentedOutputStream = InstrumentedStreams.output(output, registry, "test-out")) { + ByteArrayOutputStream output = new ByteArrayOutputStream(bytes.length); + InputStream instrumentedInputStream = InstrumentedStreams.input(input, registry, "in"); + OutputStream instrumentedOutputStream = InstrumentedStreams.output(output, registry, "out")) { assertThat(ByteStreams.copy(instrumentedInputStream, instrumentedOutputStream)) .isEqualTo(bytes.length); - assertThat(metrics.read("test-in").getCount()).isNotZero().isEqualTo(i * bytes.length); - assertThat(metrics.write("test-out").getCount()).isNotZero().isEqualTo(i * bytes.length); + assertThat(output.toByteArray()).isEqualTo(bytes); + assertThat(metrics.read("in").getCount()).isNotZero().isEqualTo(i * bytes.length); + assertThat(metrics.write("out").getCount()).isNotZero().isEqualTo(i * bytes.length); Tagged.report(reporter, registry); } } - assertThat(metrics.read("test-in").getCount()).isNotZero().isEqualTo(iterations * bytes.length); - assertThat(metrics.write("test-out").getCount()).isNotZero().isEqualTo(iterations * bytes.length); + assertThat(metrics.read("in").getCount()).isNotZero().isEqualTo(iterations * bytes.length); + assertThat(metrics.write("out").getCount()).isNotZero().isEqualTo(iterations * bytes.length); } @Test @@ -75,19 +76,20 @@ void instrumentedGzip() throws IOException { try (ConsoleReporter reporter = ConsoleReporter.forRegistry(new MetricRegistry()).build(); InputStream input = ByteStreams.limit(byteSource.openStream(), totalSize); - InputStream instrumentedInputStream = InstrumentedStreams.input(input, registry, "test-in"); + InputStream instrumentedInputStream = InstrumentedStreams.input(input, registry, "in"); OutputStream output = ByteStreams.nullOutputStream(); - OutputStream instrumentedRawOutputStream = InstrumentedStreams.output(output, registry, "raw-out"); + OutputStream instrumentedRawOutputStream = InstrumentedStreams.output(output, registry, "compressed"); OutputStream gzipOut = new GZIPOutputStream(instrumentedRawOutputStream); - OutputStream instrumentedGzipOutputStream = InstrumentedStreams.output(gzipOut, registry, "gzip-out")) { + OutputStream instrumentedGzipOutputStream = + InstrumentedStreams.output(gzipOut, registry, "to-compress")) { assertThat(ByteStreams.copy(instrumentedInputStream, instrumentedGzipOutputStream)) .isEqualTo(totalSize); - assertThat(metrics.read("test-in").getCount()).isNotZero().isEqualTo(totalSize); - assertThat(metrics.write("gzip-out").getCount()) + assertThat(metrics.read("in").getCount()).isNotZero().isEqualTo(totalSize); + assertThat(metrics.write("to-compress").getCount()) .isNotZero() .isEqualTo(totalSize) - .isGreaterThan(metrics.write("raw-out").getCount()); - assertThat(metrics.write("raw-out").getCount()).isNotZero(); + .isGreaterThan(metrics.write("compressed").getCount()); + assertThat(metrics.write("compressed").getCount()).isNotZero(); Tagged.report(reporter, registry); } }