Skip to content

Commit

Permalink
Integrate Micrometer with WebSockets Next
Browse files Browse the repository at this point in the history
  • Loading branch information
michalvavrik committed Nov 7, 2024
1 parent 5810fca commit eb5f127
Show file tree
Hide file tree
Showing 91 changed files with 2,897 additions and 55 deletions.
1 change: 1 addition & 0 deletions docs/src/main/asciidoc/telemetry-micrometer.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ Refer to the xref:./management-interface-reference.adoc[management interface ref
** Camel Messaging
* https://quarkus.io/guides/stork-reference[`quarkus-smallrye-stork`]
* https://quarkus.io/guides/vertx[`quarkus-vertx`] (http requests)
* xref:websockets-next-reference.adoc[`websockets-next`]

== Configuration Reference

Expand Down
9 changes: 9 additions & 0 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,15 @@ quarkus.websockets-next.server.traces.enabled=false
quarkus.websockets-next.client.traces.enabled=false
----

When the Micrometer extension is present, metrics for messages, errors and bytes transferred are collected.
If you do not require WebSocket metrics, you can disable metrics like in the example below:

[source, properties]
----
quarkus.websockets-next.server.metrics.enabled=false
quarkus.websockets-next.client.metrics.enabled=false
----

NOTE: Telemetry for the `BasicWebSocketConnector` is currently not supported.

[[websocket-next-configuration-reference]]
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.quarkus.websockets.next.test.telemetry;

import static io.quarkus.websockets.next.test.utils.WSClient.ReceiverMode.BINARY;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.Set;
import java.util.stream.Collectors;

import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.WebSocketConnectOptions;

record Connection(URI uri, String[] messagesToSend, WSClient client, boolean broadcast, boolean binaryMode,
String[] expectedResponses) {

static Connection of(URI uri, boolean broadcast, boolean binaryMode, String[] sentMessages, String[] expectedResponses) {
return new Connection(uri, sentMessages, null, broadcast, binaryMode, expectedResponses);
}

static Connection of(URI uri, String expectedResponse, boolean binaryMode, String... messages) {
return new Connection(uri, messages, null, false, binaryMode, new String[] { expectedResponse });
}

private Connection with(WSClient client) {
return new Connection(uri, messagesToSend, client, broadcast, binaryMode, expectedResponses);
}

private Set<String> getReceivedMessages() {
return client.getMessages().stream().map(Buffer::toString).collect(Collectors.toSet());
}

static void sendAndAssertResponses(Vertx vertx, Connection... connections) {
openConnectionsThenSend(connections, vertx, 0);
}

private static void openConnectionsThenSend(Connection[] connections, Vertx vertx, int idx) {
var connection = connections[idx];
final WSClient client = connection.binaryMode() ? new WSClient(vertx, BINARY) : new WSClient(vertx);
try (client) {
client.connect(new WebSocketConnectOptions(), connection.uri());
connections[idx] = connection.with(client);

if (idx < connections.length - 1) {
openConnectionsThenSend(connections, vertx, idx + 1);
} else {
sendMessages(connections, connection.binaryMode());
}
}
}

private static void sendMessages(Connection[] connections, boolean binaryMode) {
for (Connection connection : connections) {
for (String message : connection.messagesToSend()) {
if (binaryMode) {
connection.client().sendAndAwait(Buffer.buffer(message));
} else {
connection.client().sendAndAwait(message);
}
}
var expectedResponses = connection.expectedResponses();
if (expectedResponses.length != 0) {
if (connection.broadcast()) {
for (Connection conn : connections) {
assertResponses(conn, expectedResponses);
}
} else {
assertResponses(connection, expectedResponses);
}
}
}
}

private static void assertResponses(Connection connection, String[] expectedResponses) {
connection.client.waitForMessages(expectedResponses.length);
Set<String> actualResponses = connection.getReceivedMessages();

for (String expectedResponse : expectedResponses) {
assertTrue(actualResponses.contains(expectedResponse),
() -> "Expected response '%s' not found, was: %s".formatted(expectedResponse, actualResponses));
}

connection.client().getMessages().clear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.quarkus.websockets.next.test.telemetry;

import java.util.Arrays;

public interface ExpectedServerEndpointResponse {

String[] NO_RESPONSE = new String[] {};
EchoExpectedServerEndpointResponse ECHO_RESPONSE = new EchoExpectedServerEndpointResponse();
DoubleEchoExpectedServerEndpointResponse DOUBLE_ECHO_RESPONSE = new DoubleEchoExpectedServerEndpointResponse();

/**
* Endpoint returns void, Uni<Void> or results in exception and theefore, there is no response.
*/
final class NoExpectedServerEndpointResponse {

public String[] getExpectedResponse() {
return new String[0];
}
}

/**
* Received message is prefixed with 'echo 0: ' and returned.
*/
final class EchoExpectedServerEndpointResponse implements ExpectedServerEndpointResponse {

public String[] getExpectedResponse(String[] sentMessages) {
return Arrays.stream(sentMessages).map(msg -> "echo 0: " + msg).toArray(String[]::new);
}

}

/**
* For each received message 'msg' endpoint returns 'echo 0: msg' and 'echo 1: msg'
*/
final class DoubleEchoExpectedServerEndpointResponse implements ExpectedServerEndpointResponse {

public String[] getExpectedResponse(String[] sentMessages) {
return Arrays.stream(sentMessages)
.mapMulti((msg, consumer) -> {
consumer.accept("echo 0: " + msg);
consumer.accept("echo 1: " + msg);
})
.toArray(String[]::new);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package io.quarkus.websockets.next.test.telemetry;

import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CLIENT_CONNECTION_CLOSED;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CLIENT_CONNECTION_OPENED;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CLIENT_MESSAGES_COUNT_ERRORS;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CLIENT_MESSAGES_COUNT_RECEIVED_BYTES;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CLIENT_MESSAGES_COUNT_SENT;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.CLIENT_MESSAGES_COUNT_SENT_BYTES;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.SERVER_CONNECTION_CLOSED;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.SERVER_CONNECTION_OPENED;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.SERVER_CONNECTION_OPENED_ERROR;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.SERVER_MESSAGES_COUNT_ERRORS;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.SERVER_MESSAGES_COUNT_RECEIVED;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.SERVER_MESSAGES_COUNT_RECEIVED_BYTES;
import static io.quarkus.websockets.next.runtime.telemetry.TelemetryConstants.SERVER_MESSAGES_COUNT_SENT_BYTES;
import static io.quarkus.websockets.next.test.telemetry.AbstractWebSocketsOnMessageTest.getMetrics;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;

import org.awaitility.Awaitility;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;

public final class MetricsAsserter {

int serverReceivedCount;
int serverReceivedCountBytes;
int serverSentCountBytes;
int clientSentCount;
int clientSentCountBytes;
int clientReceivedCountBytes;
int clientErrorCount;
int serverErrorCount;
int clientConnectionOpenedCount;
int serverConnectionOpenedCount;

void assertTotalMetricsForAllPaths(int serverErrorsDelta, int serverReceivedCountDelta, Connection connection) {
int serverSentCountBytesDelta = connectionToSentBytes(connection);
int serverReceivedCountBytesDelta = connectionToReceivedBytes(connection);
assertTotalMetricsForAllPaths(serverErrorsDelta, 0, serverReceivedCountDelta, serverReceivedCountBytesDelta,
serverSentCountBytesDelta, 0, 0, 0);
}

void assertTotalMetricsForAllPaths(int serverErrorsDelta, int serverReceivedCountDelta, int serverSentCountBytesDelta,
int serverReceivedCountBytesDelta) {
assertTotalMetricsForAllPaths(serverErrorsDelta, 0, serverReceivedCountDelta, serverReceivedCountBytesDelta,
serverSentCountBytesDelta, 0, 0, 0);
}

void assertTotalMetricsForAllPaths(int serverErrorsDelta, int clientErrorsDelta, int serverReceivedCountDelta,
int serverReceivedCountBytesDelta, int serverSentCountBytesDelta, int clientSentCountDelta,
int clientSentCountBytesDelta, int clientReceivedCountBytesDelta) {
addDeltasToTotalsMeasuredPreviously(serverErrorsDelta, clientErrorsDelta, serverReceivedCountDelta,
serverReceivedCountBytesDelta, serverSentCountBytesDelta, clientSentCountDelta, clientSentCountBytesDelta,
clientReceivedCountBytesDelta);

Awaitility.await().atMost(Duration.ofSeconds(12)).untilAsserted(() -> getMetrics()
.body(assertServerConnectionOpenedTotal(serverConnectionOpenedCount))
.body(assertClientConnectionOpenedTotal(clientConnectionOpenedCount))
.body(assertServerErrorTotal(serverErrorCount))
.body(assertClientErrorTotal(clientErrorCount))
.body(assertClientMessagesCountBytesSent(clientSentCountBytes))
.body(assertClientMessagesCountBytesReceived(clientReceivedCountBytes))
.body(assertClientMessagesCountSent(clientSentCount))
.body(assertServerMessagesCountBytesReceived(serverReceivedCountBytes))
.body(assertServerMessagesCountBytesSent(serverSentCountBytes))
.body(assertServerMessagesCountReceived(serverReceivedCount)));
}

private int connectionToReceivedBytes(Connection connection) {
return stringToBytes(connection.messagesToSend());
}

private int connectionToSentBytes(Connection connection) {
return stringToBytes(connection.expectedResponses());
}

private void addDeltasToTotalsMeasuredPreviously(int serverErrorsDelta, int clientErrorsDelta, int serverReceivedCountDelta,
int serverReceivedCountBytesDelta, int serverSentCountBytesDelta, int clientSentCountDelta,
int clientSentCountBytesDelta, int clientReceivedCountBytesDelta) {
serverReceivedCount += serverReceivedCountDelta;
serverReceivedCountBytes += serverReceivedCountBytesDelta;
serverSentCountBytes += serverSentCountBytesDelta;
clientSentCount += clientSentCountDelta;
clientSentCountBytes += clientSentCountBytesDelta;
clientReceivedCountBytes += clientReceivedCountBytesDelta;
clientErrorCount += clientErrorsDelta;
serverErrorCount += serverErrorsDelta;
}

static Matcher<String> assertClientMessagesCountBytesSent(String path, int clientSentCountBytes) {
return assertTotal(CLIENT_MESSAGES_COUNT_SENT_BYTES, clientSentCountBytes, path);
}

static Matcher<String> assertClientMessagesCountBytesReceived(String path, int clientReceivedCountBytes) {
return assertTotal(CLIENT_MESSAGES_COUNT_RECEIVED_BYTES, clientReceivedCountBytes, path);
}

static Matcher<String> assertClientMessagesCountSent(String path, int clientSentCount) {
return assertTotal(CLIENT_MESSAGES_COUNT_SENT, clientSentCount, path);
}

static Matcher<String> assertServerMessagesCountReceived(String path, int serverReceivedCount) {
return assertTotal(SERVER_MESSAGES_COUNT_RECEIVED, serverReceivedCount, path);
}

static Matcher<String> assertServerMessagesCountBytesSent(String path, int serverSentCountBytes) {
return assertTotal(SERVER_MESSAGES_COUNT_SENT_BYTES, serverSentCountBytes, path);
}

static Matcher<String> assertServerMessagesCountBytesReceived(String path, int serverReceivedCountBytes) {
return assertTotal(SERVER_MESSAGES_COUNT_RECEIVED_BYTES, serverReceivedCountBytes, path);
}

static Matcher<String> assertServerErrorTotal(String path, int serverErrorCount) {
return assertTotal(SERVER_MESSAGES_COUNT_ERRORS, serverErrorCount, path);
}

static Matcher<String> assertClientErrorTotal(String path, int clientErrorCount) {
return assertTotal(CLIENT_MESSAGES_COUNT_ERRORS, clientErrorCount, path);
}

static Matcher<String> assertServerConnectionOpeningFailedTotal(String path, int serverConnectionOpeningFailedCount) {
return assertTotal(SERVER_CONNECTION_OPENED_ERROR, serverConnectionOpeningFailedCount, path);
}

static Matcher<String> assertServerConnectionOpenedTotal(int serverConnectionOpenedCount) {
return assertServerConnectionOpenedTotal(null, serverConnectionOpenedCount);
}

static Matcher<String> assertClientConnectionOpenedTotal(int clientConnectionOpenedCount) {
return assertClientConnectionOpenedTotal(null, clientConnectionOpenedCount);
}

static Matcher<String> assertClientMessagesCountBytesSent(int clientSentCountBytes) {
return assertClientMessagesCountBytesSent(null, clientSentCountBytes);
}

static Matcher<String> assertClientMessagesCountBytesReceived(int clientReceivedCountBytes) {
return assertClientMessagesCountBytesReceived(null, clientReceivedCountBytes);
}

static Matcher<String> assertClientMessagesCountSent(int clientSentCount) {
return assertClientMessagesCountSent(null, clientSentCount);
}

static Matcher<String> assertServerMessagesCountReceived(int serverReceivedCount) {
return assertServerMessagesCountReceived(null, serverReceivedCount);
}

static Matcher<String> assertServerMessagesCountBytesSent(int serverSentCountBytes) {
return assertServerMessagesCountBytesSent(null, serverSentCountBytes);
}

static Matcher<String> assertServerMessagesCountBytesReceived(int serverReceivedCountBytes) {
return assertServerMessagesCountBytesReceived(null, serverReceivedCountBytes);
}

static Matcher<String> assertServerErrorTotal(int serverErrorCount) {
return assertServerErrorTotal(null, serverErrorCount);
}

static Matcher<String> assertClientErrorTotal(int clientErrorCount) {
return assertClientErrorTotal(null, clientErrorCount);
}

static Matcher<String> assertServerConnectionOpenedTotal(String path, int serverConnectionOpenedCount) {
return assertTotal(SERVER_CONNECTION_OPENED, serverConnectionOpenedCount, path);
}

static Matcher<String> assertClientConnectionOpenedTotal(String path, int clientConnectionOpenedCount) {
return assertTotal(CLIENT_CONNECTION_OPENED, clientConnectionOpenedCount, path);
}

static Matcher<String> assertServerConnectionClosedTotal(String path, int serverConnectionClosedCount) {
return assertTotal(SERVER_CONNECTION_CLOSED, serverConnectionClosedCount, path);
}

static Matcher<String> assertClientConnectionClosedTotal(String path, int clientConnectionClosedCount) {
return assertTotal(CLIENT_CONNECTION_CLOSED, clientConnectionClosedCount, path);
}

private static Matcher<String> assertTotal(String metricKey, int expectedCount, String path) {
var prometheusFormatKey = "%s_total".formatted(toPrometheusFormat(metricKey));
return new BaseMatcher<>() {
@Override
public boolean matches(Object o) {
if (o instanceof String str) {
var sameKeyMultipleTags = str
.lines()
.filter(l -> l.contains(prometheusFormatKey))
.filter(l -> path == null || l.contains(path)) // filter by path
.map(String::trim)
.toList();
// quarkus_websockets_server_messages_count_received_total{<<some path tag>>} 2.0
// quarkus_websockets_server_messages_count_received_total{<<different path tag>>} 5.0
// = 7
var totalSum = sameKeyMultipleTags
.stream()
.map(l -> l.substring(l.lastIndexOf(" ")).trim())
.map(Double::parseDouble)
.map(Double::intValue)
.reduce(0, Integer::sum);
return totalSum == expectedCount;
}
return false;
}

@Override
public void describeTo(Description description) {
description.appendText("Key '%s' with value '%d'".formatted(prometheusFormatKey, expectedCount));
}
};
}

private static String toPrometheusFormat(String dottedMicrometerFormat) {
return dottedMicrometerFormat.replace(".", "_").replace("-", "_");
}

static int stringToBytes(String... messages) {
return Arrays.stream(messages).map(msg -> msg.getBytes(StandardCharsets.UTF_8)).map(s -> s.length).reduce(0,
Integer::sum);
}
}
Loading

0 comments on commit eb5f127

Please sign in to comment.