Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for NDJSON streaming in vertex-web #18182

Merged
merged 3 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,7 @@
import io.quarkus.gizmo.FieldCreator;
import io.quarkus.gizmo.MethodDescriptor;
import io.quarkus.gizmo.ResultHandle;
import io.quarkus.vertx.web.runtime.MultiJsonArraySupport;
import io.quarkus.vertx.web.runtime.MultiSseSupport;
import io.quarkus.vertx.web.runtime.MultiSupport;
import io.quarkus.vertx.web.runtime.RouteHandler;
import io.quarkus.vertx.web.runtime.RouteHandlers;
import io.quarkus.vertx.web.runtime.ValidationSupport;
import io.quarkus.vertx.web.runtime.*;
ntrp marked this conversation as resolved.
Show resolved Hide resolved
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniSubscribe;
Expand Down Expand Up @@ -114,6 +109,15 @@ class Methods {
"subscribeObject",
Void.TYPE, Multi.class, RoutingContext.class);

static final MethodDescriptor IS_NDJSON = MethodDescriptor.ofMethod(MultiNdjsonSupport.class, "isNdjson", Boolean.TYPE,
Multi.class);
static final MethodDescriptor MULTI_NDJSON_SUBSCRIBE_STRING = MethodDescriptor.ofMethod(MultiNdjsonSupport.class,
"subscribeString",
Void.TYPE, Multi.class, RoutingContext.class);
static final MethodDescriptor MULTI_NDJSON_SUBSCRIBE_OBJECT = MethodDescriptor.ofMethod(MultiNdjsonSupport.class,
"subscribeObject",
Void.TYPE, Multi.class, RoutingContext.class);

static final MethodDescriptor IS_JSON_ARRAY = MethodDescriptor.ofMethod(MultiJsonArraySupport.class, "isJsonArray",
Boolean.TYPE, Multi.class);
static final MethodDescriptor MULTI_JSON_SUBSCRIBE_VOID = MethodDescriptor.ofMethod(MultiJsonArraySupport.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,13 @@ void implementInvoke(HandlerDescriptor descriptor, BeanInfo bean, MethodInfo met
isSSE.close();

BytecodeCreator isNotSSE = isItSSE.falseBranch();
BranchResult isItJson = isNotSSE.ifTrue(isNotSSE.invokeStaticMethod(Methods.IS_JSON_ARRAY, res));
BranchResult isItNdJson = isNotSSE.ifTrue(isNotSSE.invokeStaticMethod(Methods.IS_NDJSON, res));
BytecodeCreator isNdjson = isItNdJson.trueBranch();
handleNdjsonMulti(descriptor, isNdjson, routingContext, res);
isNdjson.close();

BytecodeCreator isNotNdjson = isItNdJson.falseBranch();
BranchResult isItJson = isNotNdjson.ifTrue(isNotNdjson.invokeStaticMethod(Methods.IS_JSON_ARRAY, res));
BytecodeCreator isJson = isItJson.trueBranch();
handleJsonArrayMulti(descriptor, isJson, routingContext, res);
isJson.close();
Expand Down Expand Up @@ -918,6 +924,28 @@ private void handleSSEMulti(HandlerDescriptor descriptor, BytecodeCreator writer
}
}

private void handleNdjsonMulti(HandlerDescriptor descriptor, BytecodeCreator writer, ResultHandle rc,
ResultHandle res) {
// The method returns a Multi that needs to be written as server-sent event.
// We subscribe to this Multi and write the provided items (one by one) in the HTTP response.
// On completion, we "end" the response
// If the method returned null, we fail
// If the provided item is null we fail
// If the multi is empty, and the method return a Multi<Void>, we reply with a 204 - NO CONTENT (as regular)
// If the produced item is a string or buffer, the response.write method is used to write the events in the response
// If the produced item is an object, the item is mapped to JSON and included in the `data` section of the event.

if (Methods.isNoContent(descriptor)) { // Multi<Void> - so return a 204.
writer.invokeStaticMethod(Methods.MULTI_SUBSCRIBE_VOID, res, rc);
} else if (descriptor.isContentTypeString()) {
writer.invokeStaticMethod(Methods.MULTI_NDJSON_SUBSCRIBE_STRING, res, rc);
} else if (descriptor.isContentTypeBuffer() || descriptor.isContentTypeMutinyBuffer()) {
writer.invokeStaticMethod(Methods.MULTI_JSON_FAIL, rc);
} else { // Multi<Object> - encode to json.
writer.invokeStaticMethod(Methods.MULTI_NDJSON_SUBSCRIBE_OBJECT, res, rc);
}
}

private void handleJsonArrayMulti(HandlerDescriptor descriptor, BytecodeCreator writer, ResultHandle rc,
ResultHandle res) {
// The method returns a Multi that needs to be written as JSON Array.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Objects;

import io.quarkus.vertx.web.runtime.JsonArrayMulti;
import io.quarkus.vertx.web.runtime.NdjsonMulti;
import io.quarkus.vertx.web.runtime.SSEMulti;
import io.smallrye.mutiny.Multi;

Expand Down Expand Up @@ -33,7 +34,7 @@ private ReactiveRoutes() {
* {@link ServerSentEvent#event()}.
* <p>
* Example of usage:
*
*
* <pre>
* &#64;Route(path = "/people")
* Multi&lt;Person&gt; people(RoutingContext context) {
Expand All @@ -52,6 +53,43 @@ public static <T> Multi<T> asEventStream(Multi<T> multi) {
return new SSEMulti<>(Objects.requireNonNull(multi, "The passed multi must not be `null`"));
}

/**
* Indicates the the given stream should be written as a Json stream in the response.
* Returning a {@code multi} wrapped using this method produces a {@code application/x-ndjson} response. Each item
* is written as an serialized json on a new line in the response. The response automatically enables the chunked
* encoding and set the content type.
* <p>
* If the item is a String, the content will be wrapped in quotes and written.
* If the item is an Object, then the JSON representation of this object will be written.
* <p>
* Example of usage:
*
* <pre>
* &#64;Route(path = "/people")
* Multi&lt;Person&gt; people(RoutingContext context) {
* return ReactiveRoutes.asJsonStream(Multi.createFrom().items(
* new Person("superman", 1),
* new Person("batman", 2),
* new Person("spiderman", 3)));
* }
* </pre>
*
* This example produces:
*
* <pre>
* {"name":"superman", "id":1}
* {...}
* {...}
* </pre>
*
* @param multi the multi to be written
* @param <T> the type of item, can be string, object
* @return the wrapped multi
*/
public static <T> Multi<T> asJsonStream(Multi<T> multi) {
return new NdjsonMulti<>(Objects.requireNonNull(multi, "The passed multi must not be `null`"));
}

/**
* Indicates the the given stream should be written as a <em>chunked</em> JSON array in the response.
* Returning a {@code multi} wrapped using this method produces a {@code application/json} response. Each item
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package io.quarkus.vertx.web.runtime;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.smallrye.mutiny.Multi;
import io.vertx.core.AsyncResult;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.Json;
import io.vertx.ext.web.RoutingContext;

@SuppressWarnings("ReactiveStreamsSubscriberImplementation")
public class MultiNdjsonSupport {

private MultiNdjsonSupport() {
// Avoid direct instantiation.
}

private static void initialize(HttpServerResponse response, RoutingContext rc) {
if (response.bytesWritten() == 0) {
MultiMap headers = response.headers();
if (rc.getAcceptableContentType() == null) {
headers.set(HttpHeaders.CONTENT_TYPE, "application/x-ndjson");
}
response.setChunked(true);
}
}

public static void subscribeString(Multi<String> multi, RoutingContext rc) {
write(multi.map(s -> Buffer.buffer("\"" + s + "\"")), rc);
}

public static void subscribeObject(Multi<Object> multi, RoutingContext rc) {
write(multi.map(o -> Buffer.buffer(Json.encode(o) + "\n")), rc);
}

private static void onWriteDone(Subscription subscription, AsyncResult<Void> ar, RoutingContext rc) {
if (ar.failed()) {
rc.fail(ar.cause());
} else {
subscription.request(1);
}
}

public static void write(Multi<Buffer> multi, RoutingContext rc) {
HttpServerResponse response = rc.response();
multi.subscribe().withSubscriber(new Subscriber<Buffer>() {
Subscription upstream;

@Override
public void onSubscribe(Subscription subscription) {
this.upstream = subscription;
this.upstream.request(1);
}

@Override
public void onNext(Buffer item) {
initialize(response, rc);
response.write(item, ar -> onWriteDone(upstream, ar, rc));
}

@Override
public void onError(Throwable throwable) {
rc.fail(throwable);
}

@Override
public void onComplete() {
endOfStream(response, rc);
}
});
}

private static void endOfStream(HttpServerResponse response, RoutingContext rc) {
if (response.bytesWritten() == 0) { // No item
MultiMap headers = response.headers();
if (rc.getAcceptableContentType() == null) {
headers.set(HttpHeaders.CONTENT_TYPE, "application/x-ndjson");
}
}
response.end();
}

public static boolean isNdjson(Multi<?> multi) {
return multi instanceof NdjsonMulti;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.quarkus.vertx.web.runtime;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.subscription.MultiSubscriber;

/**
* Just a wrapped to capture the fact that the items must be written as SSE.
*
* @param <T> the type of item.
*/
public class NdjsonMulti<T> extends AbstractMulti<T> {

private final Multi<T> multi;

public NdjsonMulti(Multi<T> multi) {
this.multi = multi;
}

@Override
public void subscribe(MultiSubscriber<? super T> subscriber) {
multi.subscribe(Infrastructure.onMultiSubscription(multi, subscriber));
}
}