From 3eea28d5f617f408655054efd33278ce84d0563b Mon Sep 17 00:00:00 2001 From: Ian Botsford <83236726+ianbotsf@users.noreply.github.com> Date: Thu, 4 Jan 2024 13:42:30 -0800 Subject: [PATCH] Add support for metrics in stream response handler (#738) --- .../http/HttpStreamBaseResponseHandler.java | 12 ++- .../awssdk/crt/http/HttpStreamMetrics.java | 79 +++++++++++++++++++ .../crt/http/HttpStreamResponseHandler.java | 10 ++- ...ttpStreamResponseHandlerNativeAdapter.java | 8 ++ src/native/http_request_response.c | 51 ++++++++++++ src/native/java_class_ids.c | 19 +++++ src/native/java_class_ids.h | 8 ++ .../crt/test/HttpRequestResponseFixture.java | 19 +++-- 8 files changed, 196 insertions(+), 10 deletions(-) create mode 100644 src/main/java/software/amazon/awssdk/crt/http/HttpStreamMetrics.java diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamBaseResponseHandler.java b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamBaseResponseHandler.java index f3fc5d140..2684e333b 100644 --- a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamBaseResponseHandler.java +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamBaseResponseHandler.java @@ -79,12 +79,20 @@ default int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { return bodyBytesIn.length; } + /** + * Called right before stream is complete, whether successful or unsuccessful. + * @param stream The HTTP stream to which the metrics apply + * @param metrics The [HttpStreamMetrics] containing metrics for the given stream + */ + default void onMetrics(HttpStreamBase stream, HttpStreamMetrics metrics) { + /* Optional callback, nothing to do by default */ + } + /** * Called from Native when the Response has completed. - * + * * @param stream completed HttpStreamBase * @param errorCode resultant errorCode for the response */ void onResponseComplete(HttpStreamBase stream, int errorCode); - } diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamMetrics.java b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamMetrics.java new file mode 100644 index 000000000..e245a7c74 --- /dev/null +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamMetrics.java @@ -0,0 +1,79 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package software.amazon.awssdk.crt.http; + +/** + * Holds tracing metrics for an HTTP stream. Maps to `struct aws_http_stream_metrics` in **aws-c-http**'s + * **request_response.h**. + */ +public class HttpStreamMetrics { + private final long sendStartTimestampNs; + private final long sendEndTimestampNs; + private final long sendingDurationNs; + private final long receiveStartTimestampNs; + private final long receiveEndTimestampNs; + private final long receivingDurationNs; + private final int streamId; + + HttpStreamMetrics( + long sendStartTimestampNs, + long sendEndTimestampNs, + long sendingDurationNs, + long receiveStartTimestampNs, + long receiveEndTimestampNs, + long receivingDurationNs, + int streamId + ) { + this.sendStartTimestampNs = sendStartTimestampNs; + this.sendEndTimestampNs = sendEndTimestampNs; + this.sendingDurationNs = sendingDurationNs; + this.receiveStartTimestampNs = receiveStartTimestampNs; + this.receiveEndTimestampNs = receiveEndTimestampNs; + this.receivingDurationNs = receivingDurationNs; + this.streamId = streamId; + } + + public long getSendStartTimestampNs() { + return sendStartTimestampNs; + } + + public long getSendEndTimestampNs() { + return sendEndTimestampNs; + } + + public long getSendingDurationNs() { + return sendingDurationNs; + } + + public long getReceiveStartTimestampNs() { + return receiveStartTimestampNs; + } + + public long getReceiveEndTimestampNs() { + return receiveEndTimestampNs; + } + + public long getReceivingDurationNs() { + return receivingDurationNs; + } + + public int getStreamId() { + return streamId; + } + + @Override + public String toString() { + return "HttpStreamMetrics{" + + "sendStartTimestampNs=" + sendStartTimestampNs + + ", sendEndTimestampNs=" + sendEndTimestampNs + + ", sendingDurationNs=" + sendingDurationNs + + ", receiveStartTimestampNs=" + receiveStartTimestampNs + + ", receiveEndTimestampNs=" + receiveEndTimestampNs + + ", receivingDurationNs=" + receivingDurationNs + + ", streamId=" + streamId + + '}'; + } +} diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamResponseHandler.java b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamResponseHandler.java index 540b6d840..e85108ad8 100644 --- a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamResponseHandler.java +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamResponseHandler.java @@ -58,11 +58,19 @@ default int onResponseBody(HttpStream stream, byte[] bodyBytesIn) { return bodyBytesIn.length; } + /** + * Called right before stream is complete, whether successful or unsuccessful. + * @param stream The HTTP stream to which the metrics apply + * @param metrics The [HttpStreamMetrics] containing metrics for the given stream + */ + default void onMetrics(HttpStream stream, HttpStreamMetrics metrics) { + /* Optional callback, nothing to do by default */ + } + /** * Called from Native when the Response has completed. * @param stream completed stream * @param errorCode resultant errorCode for the response */ void onResponseComplete(HttpStream stream, int errorCode); - } diff --git a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamResponseHandlerNativeAdapter.java b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamResponseHandlerNativeAdapter.java index 7a0db0f64..7b6de6cbf 100644 --- a/src/main/java/software/amazon/awssdk/crt/http/HttpStreamResponseHandlerNativeAdapter.java +++ b/src/main/java/software/amazon/awssdk/crt/http/HttpStreamResponseHandlerNativeAdapter.java @@ -51,6 +51,14 @@ int onResponseBody(HttpStreamBase stream, ByteBuffer bodyBytesIn) { } } + void onMetrics(HttpStreamBase stream, HttpStreamMetrics metrics) { + if (this.responseBaseHandler != null) { + responseBaseHandler.onMetrics(stream, metrics); + } else { + responseHandler.onMetrics((HttpStream) stream, metrics); + } + } + void onResponseComplete(HttpStreamBase stream, int errorCode) { if (this.responseBaseHandler != null) { responseBaseHandler.onResponseComplete(stream, errorCode); diff --git a/src/native/http_request_response.c b/src/native/http_request_response.c index 4dd85b7b6..80f8be16c 100644 --- a/src/native/http_request_response.c +++ b/src/native/http_request_response.c @@ -311,6 +311,56 @@ void aws_java_http_stream_on_stream_destroy_fn(void *user_data) { /********** JNI ENV RELEASE **********/ } +void aws_java_http_stream_on_stream_metrics_fn( + struct aws_http_stream *stream, + const struct aws_http_stream_metrics *metrics, + void *user_data) { + struct http_stream_binding *binding = (struct http_stream_binding *)user_data; + + /********** JNI ENV ACQUIRE **********/ + JNIEnv *env = aws_jni_acquire_thread_env(binding->jvm); + if (env == NULL) { + /* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */ + return; + } + + /* Convert metrics to Java HttpStreamMetrics obj */ + jobject jni_metrics = (*env)->NewObject( + env, + http_stream_metrics_properties.http_stream_metrics_class, + http_stream_metrics_properties.constructor_id, + (jlong)metrics->send_start_timestamp_ns, + (jlong)metrics->send_end_timestamp_ns, + (jlong)metrics->sending_duration_ns, + (jlong)metrics->receive_start_timestamp_ns, + (jlong)metrics->receive_end_timestamp_ns, + (jlong)metrics->receiving_duration_ns, + + /* Stream IDs are 31-bit unsigned integers, which fits into Java's regular (signed) 32-bit int */ + (jint)metrics->stream_id); + + (*env)->CallVoidMethod( + env, + binding->java_http_response_stream_handler, + http_stream_response_handler_properties.onMetrics, + binding->java_http_stream_base, + jni_metrics); + + /* Delete local reference to metrics object */ + (*env)->DeleteLocalRef(env, jni_metrics); + + if (aws_jni_check_and_clear_exception(env)) { + /* Close the Connection if the Java Callback throws an Exception */ + aws_http_connection_close(aws_http_stream_get_connection(stream)); + + AWS_LOGF_ERROR(AWS_LS_HTTP_STREAM, "id=%p: Received Exception from onMetrics", (void *)stream); + aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE); + } + + aws_jni_release_thread_env(binding->jvm, env); + /********** JNI ENV RELEASE **********/ +} + jobjectArray aws_java_http_headers_from_native(JNIEnv *env, struct aws_http_headers *headers) { (void)headers; jobjectArray ret; @@ -383,6 +433,7 @@ static jobject s_make_request_general( .on_response_body = aws_java_http_stream_on_incoming_body_fn, .on_complete = aws_java_http_stream_on_stream_complete_fn, .on_destroy = aws_java_http_stream_on_stream_destroy_fn, + .on_metrics = aws_java_http_stream_on_stream_metrics_fn, .user_data = stream_binding, }; diff --git a/src/native/java_class_ids.c b/src/native/java_class_ids.c index 74a607384..a1191b7df 100644 --- a/src/native/java_class_ids.c +++ b/src/native/java_class_ids.c @@ -522,6 +522,13 @@ static void s_cache_http_stream_response_handler_native_adapter(JNIEnv *env) { http_stream_response_handler_properties.onResponseComplete = (*env)->GetMethodID(env, cls, "onResponseComplete", "(Lsoftware/amazon/awssdk/crt/http/HttpStreamBase;I)V"); AWS_FATAL_ASSERT(http_stream_response_handler_properties.onResponseComplete); + + http_stream_response_handler_properties.onMetrics = (*env)->GetMethodID( + env, + cls, + "onMetrics", + "(Lsoftware/amazon/awssdk/crt/http/HttpStreamBase;Lsoftware/amazon/awssdk/crt/http/HttpStreamMetrics;)V"); + AWS_FATAL_ASSERT(http_stream_response_handler_properties.onMetrics); } struct java_http_stream_write_chunk_completion_properties http_stream_write_chunk_completion_properties; @@ -535,6 +542,17 @@ static void s_cache_http_stream_write_chunk_completion_properties(JNIEnv *env) { AWS_FATAL_ASSERT(http_stream_write_chunk_completion_properties.callback); } +struct java_http_stream_metrics_properties http_stream_metrics_properties; + +static void s_cache_http_stream_metrics_properties(JNIEnv *env) { + jclass cls = (*env)->FindClass(env, "software/amazon/awssdk/crt/http/HttpStreamMetrics"); + AWS_FATAL_ASSERT(cls); + http_stream_metrics_properties.http_stream_metrics_class = (*env)->NewGlobalRef(env, cls); + + http_stream_metrics_properties.constructor_id = (*env)->GetMethodID(env, cls, "", "(JJJJJJI)V"); + AWS_FATAL_ASSERT(http_stream_metrics_properties.constructor_id); +} + struct java_event_stream_server_listener_properties event_stream_server_listener_properties; static void s_cache_event_stream_server_listener_properties(JNIEnv *env) { @@ -2316,6 +2334,7 @@ static void s_cache_java_class_ids(void *user_data) { s_cache_http2_stream(env); s_cache_http_stream_response_handler_native_adapter(env); s_cache_http_stream_write_chunk_completion_properties(env); + s_cache_http_stream_metrics_properties(env); s_cache_event_stream_server_listener_properties(env); s_cache_event_stream_server_listener_handler_properties(env); s_cache_event_stream_server_connection_handler_properties(env); diff --git a/src/native/java_class_ids.h b/src/native/java_class_ids.h index 72ae187f2..d9ffae3af 100644 --- a/src/native/java_class_ids.h +++ b/src/native/java_class_ids.h @@ -236,6 +236,7 @@ struct java_http_stream_response_handler_native_adapter_properties { jmethodID onResponseHeadersDone; jmethodID onResponseBody; jmethodID onResponseComplete; + jmethodID onMetrics; }; extern struct java_http_stream_response_handler_native_adapter_properties http_stream_response_handler_properties; @@ -245,6 +246,13 @@ struct java_http_stream_write_chunk_completion_properties { }; extern struct java_http_stream_write_chunk_completion_properties http_stream_write_chunk_completion_properties; +/* HtppStreamMetrics */ +struct java_http_stream_metrics_properties { + jclass http_stream_metrics_class; + jmethodID constructor_id; +}; +extern struct java_http_stream_metrics_properties http_stream_metrics_properties; + /* EventStreamServerListener */ struct java_event_stream_server_listener_properties { jmethodID onShutdownComplete; diff --git a/src/test/java/software/amazon/awssdk/crt/test/HttpRequestResponseFixture.java b/src/test/java/software/amazon/awssdk/crt/test/HttpRequestResponseFixture.java index a22b1a4e5..d4d1ca7fe 100644 --- a/src/test/java/software/amazon/awssdk/crt/test/HttpRequestResponseFixture.java +++ b/src/test/java/software/amazon/awssdk/crt/test/HttpRequestResponseFixture.java @@ -7,18 +7,15 @@ import org.junit.Assert; import software.amazon.awssdk.crt.CRT; import software.amazon.awssdk.crt.CrtResource; -import software.amazon.awssdk.crt.http.Http2ClientConnection; -import software.amazon.awssdk.crt.http.Http2Request; import software.amazon.awssdk.crt.http.HttpClientConnection; import software.amazon.awssdk.crt.http.HttpClientConnectionManager; -import software.amazon.awssdk.crt.http.HttpVersion; import software.amazon.awssdk.crt.http.HttpHeader; -import software.amazon.awssdk.crt.http.HttpRequest; import software.amazon.awssdk.crt.http.HttpRequestBase; -import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; -import software.amazon.awssdk.crt.http.HttpStreamBase; import software.amazon.awssdk.crt.http.HttpStream; -import software.amazon.awssdk.crt.http.Http2Stream; +import software.amazon.awssdk.crt.http.HttpStreamBase; +import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler; +import software.amazon.awssdk.crt.http.HttpStreamMetrics; +import software.amazon.awssdk.crt.http.HttpVersion; import java.net.URI; import java.nio.ByteBuffer; @@ -31,6 +28,7 @@ import java.security.NoSuchAlgorithmException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; public class HttpRequestResponseFixture extends HttpClientTestFixture { @@ -94,6 +92,7 @@ public TestHttpResponse getResponse(URI uri, HttpRequestBase request, byte[] chu boolean actuallyConnected = false; final CompletableFuture reqCompleted = new CompletableFuture<>(); + final AtomicReference metricsRef = new AtomicReference<>(null); final TestHttpResponse response = new TestHttpResponse(); @@ -126,6 +125,11 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) { return amountRead; } + @Override + public void onMetrics(HttpStreamBase stream, HttpStreamMetrics metrics) { + Assert.assertTrue(metricsRef.compareAndSet(null, metrics)); + } + @Override public void onResponseComplete(HttpStreamBase stream, int errorCode) { response.onCompleteErrorCode = errorCode; @@ -149,6 +153,7 @@ public void onResponseComplete(HttpStreamBase stream, int errorCode) { } Assert.assertTrue(actuallyConnected); + Assert.assertNotNull(metricsRef.get()); shutdownComplete.get(60, TimeUnit.SECONDS);