diff --git a/changelog/@unreleased/pr-486.v2.yml b/changelog/@unreleased/pr-486.v2.yml new file mode 100644 index 000000000..991618ca2 --- /dev/null +++ b/changelog/@unreleased/pr-486.v2.yml @@ -0,0 +1,5 @@ +type: improvement +improvement: + description: Report metrics for in-flight requests + links: + - https://github.com/palantir/dialogue/pull/486 diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/ActiveRequestInstrumentationChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/ActiveRequestInstrumentationChannel.java new file mode 100644 index 000000000..1f11e0dfe --- /dev/null +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/ActiveRequestInstrumentationChannel.java @@ -0,0 +1,59 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import com.codahale.metrics.Counter; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.errorprone.annotations.CompileTimeConstant; +import com.palantir.dialogue.Channel; +import com.palantir.dialogue.Endpoint; +import com.palantir.dialogue.Request; +import com.palantir.dialogue.Response; + +final class ActiveRequestInstrumentationChannel implements Channel { + + private final String stage; + private final Channel delegate; + private final DialogueClientMetrics metrics; + + ActiveRequestInstrumentationChannel( + Channel delegate, @CompileTimeConstant String stage, DialogueClientMetrics metrics) { + // The delegate must never be allowed to throw, otherwise the counter may be incremented without + // being decremented. + this.delegate = new NeverThrowChannel(delegate); + this.stage = stage; + this.metrics = metrics; + } + + @Override + public ListenableFuture execute(Endpoint endpoint, Request request) { + Counter counter = metrics.requestActive() + .serviceName(endpoint.serviceName()) + .stage(stage) + .build(); + counter.inc(); + ListenableFuture result = delegate.execute(endpoint, request); + result.addListener(counter::dec, MoreExecutors.directExecutor()); + return result; + } + + @Override + public String toString() { + return "ActiveRequestInstrumentationChannel{" + "delegate=" + delegate + ", stage=" + stage + '}'; + } +} diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/Channels.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/Channels.java index b6d5d75f7..995602e19 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/Channels.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/Channels.java @@ -99,6 +99,7 @@ public Channel build() { List limitedChannels = channels.stream() // Instrument inner-most channel with metrics so that we measure only the over-the-wire-time .map(channel -> new InstrumentedChannel(channel, clientMetrics)) + .map(channel -> new ActiveRequestInstrumentationChannel(channel, "running", clientMetrics)) // TracedChannel must wrap TracedRequestChannel to ensure requests have tracing headers. .map(TracedRequestChannel::new) .map(channel -> new TracedChannel(channel, "Dialogue-http-request")) @@ -116,6 +117,7 @@ public Channel build() { channel = new ContentDecodingChannel(channel); channel = new NeverThrowChannel(channel); channel = new TracedChannel(channel, "Dialogue-request"); + channel = new ActiveRequestInstrumentationChannel(channel, "processing", clientMetrics); return channel; } diff --git a/dialogue-core/src/main/metrics/dialogue-core-metrics.yml b/dialogue-core/src/main/metrics/dialogue-core-metrics.yml index c7a46a908..db645a049 100644 --- a/dialogue-core/src/main/metrics/dialogue-core-metrics.yml +++ b/dialogue-core/src/main/metrics/dialogue-core-metrics.yml @@ -9,6 +9,13 @@ namespaces: type: timer tags: [service-name] docs: Request time, note that this does not include time spent reading the response body. + request.active: + type: counter + tags: [service-name, stage] + docs: + Number of requests that are actively running. The `stage` may refer to `running` requests actively + executing over the wire or `processing` which may be awaiting a client or backing off for a retry. + Note that running requests are also counted as processing. deprecations: type: meter tags: [service-name] diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/ActiveRequestInstrumentationChannelTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/ActiveRequestInstrumentationChannelTest.java new file mode 100644 index 000000000..0e9e1c332 --- /dev/null +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/ActiveRequestInstrumentationChannelTest.java @@ -0,0 +1,81 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.codahale.metrics.Counter; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.palantir.dialogue.Channel; +import com.palantir.dialogue.Endpoint; +import com.palantir.dialogue.HttpMethod; +import com.palantir.dialogue.Request; +import com.palantir.dialogue.Response; +import com.palantir.dialogue.UrlBuilder; +import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry; +import java.util.Map; +import org.junit.jupiter.api.Test; + +final class ActiveRequestInstrumentationChannelTest { + + @Test + public void testActiveRequests() { + SettableFuture future = SettableFuture.create(); + Channel stub = (_endpoint, _request) -> future; + DialogueClientMetrics metrics = DialogueClientMetrics.of(new DefaultTaggedMetricRegistry()); + ActiveRequestInstrumentationChannel instrumented = + new ActiveRequestInstrumentationChannel(stub, "stage", metrics); + ListenableFuture result = + instrumented.execute(StubEndpoint.INSTANCE, Request.builder().build()); + assertThat(result).isNotDone(); + Counter counter = metrics.requestActive() + .serviceName("StubService") + .stage("stage") + .build(); + assertThat(counter.getCount()).isOne(); + future.cancel(false); + assertThat(counter.getCount()).isZero(); + } + + enum StubEndpoint implements Endpoint { + INSTANCE; + + @Override + public void renderPath(Map _params, UrlBuilder _url) {} + + @Override + public HttpMethod httpMethod() { + return HttpMethod.GET; + } + + @Override + public String serviceName() { + return "StubService"; + } + + @Override + public String endpointName() { + return "stubEndpoint"; + } + + @Override + public String version() { + return "0.0.1"; + } + } +}