Skip to content

Commit

Permalink
Add support for metrics in stream response handler (#738)
Browse files Browse the repository at this point in the history
  • Loading branch information
ianbotsf authored Jan 4, 2024
1 parent 9cf1986 commit 3eea28d
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,20 @@ default int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
return bodyBytesIn.length;
}

/**
* Called right before stream is complete, whether successful or unsuccessful.
* @param stream The HTTP stream to which the metrics apply
* @param metrics The [HttpStreamMetrics] containing metrics for the given stream
*/
default void onMetrics(HttpStreamBase stream, HttpStreamMetrics metrics) {
/* Optional callback, nothing to do by default */
}

/**
* Called from Native when the Response has completed.
*
*
* @param stream completed HttpStreamBase
* @param errorCode resultant errorCode for the response
*/
void onResponseComplete(HttpStreamBase stream, int errorCode);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package software.amazon.awssdk.crt.http;

/**
* Holds tracing metrics for an HTTP stream. Maps to `struct aws_http_stream_metrics` in **aws-c-http**'s
* **request_response.h**.
*/
public class HttpStreamMetrics {
private final long sendStartTimestampNs;
private final long sendEndTimestampNs;
private final long sendingDurationNs;
private final long receiveStartTimestampNs;
private final long receiveEndTimestampNs;
private final long receivingDurationNs;
private final int streamId;

HttpStreamMetrics(
long sendStartTimestampNs,
long sendEndTimestampNs,
long sendingDurationNs,
long receiveStartTimestampNs,
long receiveEndTimestampNs,
long receivingDurationNs,
int streamId
) {
this.sendStartTimestampNs = sendStartTimestampNs;
this.sendEndTimestampNs = sendEndTimestampNs;
this.sendingDurationNs = sendingDurationNs;
this.receiveStartTimestampNs = receiveStartTimestampNs;
this.receiveEndTimestampNs = receiveEndTimestampNs;
this.receivingDurationNs = receivingDurationNs;
this.streamId = streamId;
}

public long getSendStartTimestampNs() {
return sendStartTimestampNs;
}

public long getSendEndTimestampNs() {
return sendEndTimestampNs;
}

public long getSendingDurationNs() {
return sendingDurationNs;
}

public long getReceiveStartTimestampNs() {
return receiveStartTimestampNs;
}

public long getReceiveEndTimestampNs() {
return receiveEndTimestampNs;
}

public long getReceivingDurationNs() {
return receivingDurationNs;
}

public int getStreamId() {
return streamId;
}

@Override
public String toString() {
return "HttpStreamMetrics{" +
"sendStartTimestampNs=" + sendStartTimestampNs +
", sendEndTimestampNs=" + sendEndTimestampNs +
", sendingDurationNs=" + sendingDurationNs +
", receiveStartTimestampNs=" + receiveStartTimestampNs +
", receiveEndTimestampNs=" + receiveEndTimestampNs +
", receivingDurationNs=" + receivingDurationNs +
", streamId=" + streamId +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,19 @@ default int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
return bodyBytesIn.length;
}

/**
* Called right before stream is complete, whether successful or unsuccessful.
* @param stream The HTTP stream to which the metrics apply
* @param metrics The [HttpStreamMetrics] containing metrics for the given stream
*/
default void onMetrics(HttpStream stream, HttpStreamMetrics metrics) {
/* Optional callback, nothing to do by default */
}

/**
* Called from Native when the Response has completed.
* @param stream completed stream
* @param errorCode resultant errorCode for the response
*/
void onResponseComplete(HttpStream stream, int errorCode);

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ int onResponseBody(HttpStreamBase stream, ByteBuffer bodyBytesIn) {
}
}

void onMetrics(HttpStreamBase stream, HttpStreamMetrics metrics) {
if (this.responseBaseHandler != null) {
responseBaseHandler.onMetrics(stream, metrics);
} else {
responseHandler.onMetrics((HttpStream) stream, metrics);
}
}

void onResponseComplete(HttpStreamBase stream, int errorCode) {
if (this.responseBaseHandler != null) {
responseBaseHandler.onResponseComplete(stream, errorCode);
Expand Down
51 changes: 51 additions & 0 deletions src/native/http_request_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,56 @@ void aws_java_http_stream_on_stream_destroy_fn(void *user_data) {
/********** JNI ENV RELEASE **********/
}

void aws_java_http_stream_on_stream_metrics_fn(
struct aws_http_stream *stream,
const struct aws_http_stream_metrics *metrics,
void *user_data) {
struct http_stream_binding *binding = (struct http_stream_binding *)user_data;

/********** JNI ENV ACQUIRE **********/
JNIEnv *env = aws_jni_acquire_thread_env(binding->jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
return;
}

/* Convert metrics to Java HttpStreamMetrics obj */
jobject jni_metrics = (*env)->NewObject(
env,
http_stream_metrics_properties.http_stream_metrics_class,
http_stream_metrics_properties.constructor_id,
(jlong)metrics->send_start_timestamp_ns,
(jlong)metrics->send_end_timestamp_ns,
(jlong)metrics->sending_duration_ns,
(jlong)metrics->receive_start_timestamp_ns,
(jlong)metrics->receive_end_timestamp_ns,
(jlong)metrics->receiving_duration_ns,

/* Stream IDs are 31-bit unsigned integers, which fits into Java's regular (signed) 32-bit int */
(jint)metrics->stream_id);

(*env)->CallVoidMethod(
env,
binding->java_http_response_stream_handler,
http_stream_response_handler_properties.onMetrics,
binding->java_http_stream_base,
jni_metrics);

/* Delete local reference to metrics object */
(*env)->DeleteLocalRef(env, jni_metrics);

if (aws_jni_check_and_clear_exception(env)) {
/* Close the Connection if the Java Callback throws an Exception */
aws_http_connection_close(aws_http_stream_get_connection(stream));

AWS_LOGF_ERROR(AWS_LS_HTTP_STREAM, "id=%p: Received Exception from onMetrics", (void *)stream);
aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
}

aws_jni_release_thread_env(binding->jvm, env);
/********** JNI ENV RELEASE **********/
}

jobjectArray aws_java_http_headers_from_native(JNIEnv *env, struct aws_http_headers *headers) {
(void)headers;
jobjectArray ret;
Expand Down Expand Up @@ -383,6 +433,7 @@ static jobject s_make_request_general(
.on_response_body = aws_java_http_stream_on_incoming_body_fn,
.on_complete = aws_java_http_stream_on_stream_complete_fn,
.on_destroy = aws_java_http_stream_on_stream_destroy_fn,
.on_metrics = aws_java_http_stream_on_stream_metrics_fn,
.user_data = stream_binding,
};

Expand Down
19 changes: 19 additions & 0 deletions src/native/java_class_ids.c
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,13 @@ static void s_cache_http_stream_response_handler_native_adapter(JNIEnv *env) {
http_stream_response_handler_properties.onResponseComplete =
(*env)->GetMethodID(env, cls, "onResponseComplete", "(Lsoftware/amazon/awssdk/crt/http/HttpStreamBase;I)V");
AWS_FATAL_ASSERT(http_stream_response_handler_properties.onResponseComplete);

http_stream_response_handler_properties.onMetrics = (*env)->GetMethodID(
env,
cls,
"onMetrics",
"(Lsoftware/amazon/awssdk/crt/http/HttpStreamBase;Lsoftware/amazon/awssdk/crt/http/HttpStreamMetrics;)V");
AWS_FATAL_ASSERT(http_stream_response_handler_properties.onMetrics);
}

struct java_http_stream_write_chunk_completion_properties http_stream_write_chunk_completion_properties;
Expand All @@ -535,6 +542,17 @@ static void s_cache_http_stream_write_chunk_completion_properties(JNIEnv *env) {
AWS_FATAL_ASSERT(http_stream_write_chunk_completion_properties.callback);
}

struct java_http_stream_metrics_properties http_stream_metrics_properties;

static void s_cache_http_stream_metrics_properties(JNIEnv *env) {
jclass cls = (*env)->FindClass(env, "software/amazon/awssdk/crt/http/HttpStreamMetrics");
AWS_FATAL_ASSERT(cls);
http_stream_metrics_properties.http_stream_metrics_class = (*env)->NewGlobalRef(env, cls);

http_stream_metrics_properties.constructor_id = (*env)->GetMethodID(env, cls, "<init>", "(JJJJJJI)V");
AWS_FATAL_ASSERT(http_stream_metrics_properties.constructor_id);
}

struct java_event_stream_server_listener_properties event_stream_server_listener_properties;

static void s_cache_event_stream_server_listener_properties(JNIEnv *env) {
Expand Down Expand Up @@ -2316,6 +2334,7 @@ static void s_cache_java_class_ids(void *user_data) {
s_cache_http2_stream(env);
s_cache_http_stream_response_handler_native_adapter(env);
s_cache_http_stream_write_chunk_completion_properties(env);
s_cache_http_stream_metrics_properties(env);
s_cache_event_stream_server_listener_properties(env);
s_cache_event_stream_server_listener_handler_properties(env);
s_cache_event_stream_server_connection_handler_properties(env);
Expand Down
8 changes: 8 additions & 0 deletions src/native/java_class_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ struct java_http_stream_response_handler_native_adapter_properties {
jmethodID onResponseHeadersDone;
jmethodID onResponseBody;
jmethodID onResponseComplete;
jmethodID onMetrics;
};
extern struct java_http_stream_response_handler_native_adapter_properties http_stream_response_handler_properties;

Expand All @@ -245,6 +246,13 @@ struct java_http_stream_write_chunk_completion_properties {
};
extern struct java_http_stream_write_chunk_completion_properties http_stream_write_chunk_completion_properties;

/* HtppStreamMetrics */
struct java_http_stream_metrics_properties {
jclass http_stream_metrics_class;
jmethodID constructor_id;
};
extern struct java_http_stream_metrics_properties http_stream_metrics_properties;

/* EventStreamServerListener */
struct java_event_stream_server_listener_properties {
jmethodID onShutdownComplete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,15 @@
import org.junit.Assert;
import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.http.Http2ClientConnection;
import software.amazon.awssdk.crt.http.Http2Request;
import software.amazon.awssdk.crt.http.HttpClientConnection;
import software.amazon.awssdk.crt.http.HttpClientConnectionManager;
import software.amazon.awssdk.crt.http.HttpVersion;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.http.HttpRequestBase;
import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.crt.http.HttpStream;
import software.amazon.awssdk.crt.http.Http2Stream;
import software.amazon.awssdk.crt.http.HttpStreamBase;
import software.amazon.awssdk.crt.http.HttpStreamBaseResponseHandler;
import software.amazon.awssdk.crt.http.HttpStreamMetrics;
import software.amazon.awssdk.crt.http.HttpVersion;

import java.net.URI;
import java.nio.ByteBuffer;
Expand All @@ -31,6 +28,7 @@
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class HttpRequestResponseFixture extends HttpClientTestFixture {

Expand Down Expand Up @@ -94,6 +92,7 @@ public TestHttpResponse getResponse(URI uri, HttpRequestBase request, byte[] chu
boolean actuallyConnected = false;

final CompletableFuture<Void> reqCompleted = new CompletableFuture<>();
final AtomicReference<HttpStreamMetrics> metricsRef = new AtomicReference<>(null);

final TestHttpResponse response = new TestHttpResponse();

Expand Down Expand Up @@ -126,6 +125,11 @@ public int onResponseBody(HttpStreamBase stream, byte[] bodyBytesIn) {
return amountRead;
}

@Override
public void onMetrics(HttpStreamBase stream, HttpStreamMetrics metrics) {
Assert.assertTrue(metricsRef.compareAndSet(null, metrics));
}

@Override
public void onResponseComplete(HttpStreamBase stream, int errorCode) {
response.onCompleteErrorCode = errorCode;
Expand All @@ -149,6 +153,7 @@ public void onResponseComplete(HttpStreamBase stream, int errorCode) {
}

Assert.assertTrue(actuallyConnected);
Assert.assertNotNull(metricsRef.get());

shutdownComplete.get(60, TimeUnit.SECONDS);

Expand Down

0 comments on commit 3eea28d

Please sign in to comment.