Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JSON streaming for RESTEasy Reactive Jsonb and Jackson #20908

Merged
merged 2 commits into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/src/main/asciidoc/kafka-schema-registry-avro.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ import javax.ws.rs.core.MediaType;

import org.acme.kafka.quarkus.Movie;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.jboss.resteasy.reactive.RestSseElementType;
import org.jboss.resteasy.reactive.RestStreamElementType;

import io.smallrye.mutiny.Multi;

Expand All @@ -242,7 +242,7 @@ public class ConsumedMovieResource {

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestSseElementType(MediaType.TEXT_PLAIN)
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<String> stream() {
return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear()));
}
Expand Down
6 changes: 3 additions & 3 deletions docs/src/main/asciidoc/resteasy-reactive.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ https://html.spec.whatwg.org/multipage/server-sent-events.html[Server-Sent Event
by just annotating your endpoint method with
link:{jaxrsapi}/javax/ws/rs/Produces.html[`@Produces(MediaType.SERVER_SENT_EVENTS)`]
and specifying that each element should be <<json,serialised to JSON>> with
`@RestSseElementType(MediaType.APPLICATION_JSON)`.
`@RestStreamElementType(MediaType.APPLICATION_JSON)`.

[source,java]
----
Expand All @@ -720,7 +720,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.jboss.resteasy.reactive.RestSseElementType;
import org.jboss.resteasy.reactive.RestStreamElementType;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
Expand All @@ -739,7 +739,7 @@ public class Endpoint {
// Send the stream over SSE
@Produces(MediaType.SERVER_SENT_EVENTS)
// Each element will be sent as JSON
@RestSseElementType(MediaType.APPLICATION_JSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<Book> stream() {
return books;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@
<artifactId>quarkus-hibernate-validator-deployment</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jaxrs-client-reactive-deployment</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.jboss.logging.Logger;
import org.jboss.resteasy.reactive.common.model.ResourceMethod;
import org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames;
import org.jboss.resteasy.reactive.common.util.RestMediaType;
import org.jboss.resteasy.reactive.server.util.MethodId;

import com.fasterxml.jackson.annotation.JsonView;
Expand Down Expand Up @@ -137,15 +138,18 @@ void additionalProviders(List<JacksonFeatureBuildItem> jacksonFeatureBuildItems,
additionalWriters
.produce(new MessageBodyWriterBuildItem(getJacksonMessageBodyWriter(applicationNeedsSpecialJacksonFeatures),
Object.class.getName(),
Collections.singletonList(MediaType.APPLICATION_JSON)));
List.of(MediaType.APPLICATION_JSON, RestMediaType.APPLICATION_NDJSON,
RestMediaType.APPLICATION_STREAM_JSON)));
additionalWriters
.produce(new MessageBodyWriterBuildItem(VertxJsonArrayMessageBodyWriter.class.getName(),
JsonArray.class.getName(),
Collections.singletonList(MediaType.APPLICATION_JSON)));
List.of(MediaType.APPLICATION_JSON, RestMediaType.APPLICATION_NDJSON,
RestMediaType.APPLICATION_STREAM_JSON)));
additionalWriters
.produce(new MessageBodyWriterBuildItem(VertxJsonObjectMessageBodyWriter.class.getName(),
JsonObject.class.getName(),
Collections.singletonList(MediaType.APPLICATION_JSON)));
List.of(MediaType.APPLICATION_JSON, RestMediaType.APPLICATION_NDJSON,
RestMediaType.APPLICATION_STREAM_JSON)));
}

private String getJacksonMessageBodyWriter(boolean applicationNeedsSpecialJacksonFeatures) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.quarkus.resteasy.reactive.jackson.deployment.test.sse;

public class Message {
public String name;

public Message(String name) {
this.name = name;
}

// for Jsonb
public Message() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package io.quarkus.resteasy.reactive.jackson.deployment.test.sse;

import java.io.IOException;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.sse.SseEventSink;

import org.jboss.resteasy.reactive.RestStreamElementType;
import org.jboss.resteasy.reactive.common.util.RestMediaType;

import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Multi;

@Path("sse")
public class SseResource {

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void sse(Sse sse, SseEventSink sink) {
if (sink == null) {
throw new IllegalStateException("No client connected.");
}
SseBroadcaster sseBroadcaster = sse.newBroadcaster();

sseBroadcaster.register(sink);
sseBroadcaster.broadcast(sse.newEventBuilder().data("hello").build())
.thenCompose(v -> sseBroadcaster.broadcast(sse.newEventBuilder().data("stef").build()))
.thenAccept(v -> sseBroadcaster.close());
}

@Path("multi")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> multiText() {
return Multi.createFrom().items("hello", "stef");
}

@Path("json")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public void sseJson(Sse sse, SseEventSink sink) throws IOException {
if (sink == null) {
throw new IllegalStateException("No client connected.");
}
SseBroadcaster sseBroadcaster = sse.newBroadcaster();

sseBroadcaster.register(sink);
sseBroadcaster.broadcast(sse.newEventBuilder().data(new Message("hello")).build())
.thenCompose(v -> sseBroadcaster.broadcast(sse.newEventBuilder().data(new Message("stef")).build()))
.thenAccept(v -> sseBroadcaster.close());
}

@Blocking
@Path("blocking/json")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public void blockingSseJson(Sse sse, SseEventSink sink) throws IOException {
if (sink == null) {
throw new IllegalStateException("No client connected.");
}
SseBroadcaster sseBroadcaster = sse.newBroadcaster();

sseBroadcaster.register(sink);
sseBroadcaster.broadcast(sse.newEventBuilder().data(new Message("hello")).build())
.thenCompose(v -> sseBroadcaster.broadcast(sse.newEventBuilder().data(new Message("stef")).build()))
.thenAccept(v -> sseBroadcaster.close());
}

@Path("json2")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void sseJson2(Sse sse, SseEventSink sink) throws IOException {
if (sink == null) {
throw new IllegalStateException("No client connected.");
}
SseBroadcaster sseBroadcaster = sse.newBroadcaster();

// Same as sseJson but set mediaType in builder
sseBroadcaster.register(sink);
sseBroadcaster
.broadcast(sse.newEventBuilder().data(new Message("hello")).mediaType(MediaType.APPLICATION_JSON_TYPE).build())
.thenCompose(v -> sseBroadcaster.broadcast(
sse.newEventBuilder().mediaType(MediaType.APPLICATION_JSON_TYPE).data(new Message("stef")).build()))
.thenAccept(v -> sseBroadcaster.close());
}

@Path("json/multi")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<Message> multiJson() {
return Multi.createFrom().items(new Message("hello"), new Message("stef"));
}

@Path("json/multi2")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<Message> multiDefaultElementType() {
return Multi.createFrom().items(new Message("hello"), new Message("stef"));
}

@Path("ndjson/multi")
@GET
@Produces(RestMediaType.APPLICATION_NDJSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<Message> multiNdJson() {
return Multi.createFrom().items(new Message("hello"), new Message("stef"));
}

@Path("stream-json/multi")
@GET
@Produces(RestMediaType.APPLICATION_STREAM_JSON)
@RestStreamElementType(MediaType.APPLICATION_JSON)
public Multi<Message> multiStreamJson() {
return Multi.createFrom().items(new Message("hello"), new Message("stef"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package io.quarkus.resteasy.reactive.jackson.deployment.test.sse;

import static io.restassured.RestAssured.when;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;

import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.sse.InboundSseEvent;
import javax.ws.rs.sse.SseEventSource;

import org.apache.http.HttpStatus;
import org.jboss.resteasy.reactive.client.impl.MultiInvoker;
import org.jboss.resteasy.reactive.common.util.RestMediaType;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.smallrye.mutiny.Multi;

public class SseTestCase {

@TestHTTPResource
URI uri;

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(SseResource.class, Message.class));

@Test
public void testSseFromSse() throws Exception {
testSse("sse");
}

@Test
public void testSseFromMulti() throws Exception {
testSse("sse/multi");
}

private void testSse(String path) throws Exception {
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(uri.toString() + path);
// do not reconnect
try (SseEventSource eventSource = SseEventSource.target(target).reconnectingEvery(Integer.MAX_VALUE, TimeUnit.SECONDS)
.build()) {
CompletableFuture<List<String>> res = new CompletableFuture<>();
List<String> collect = Collections.synchronizedList(new ArrayList<>());
eventSource.register(new Consumer<InboundSseEvent>() {
@Override
public void accept(InboundSseEvent inboundSseEvent) {
collect.add(inboundSseEvent.readData());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
res.completeExceptionally(throwable);
}
}, () -> {
res.complete(collect);
});
eventSource.open();
assertThat(res.get(5, TimeUnit.SECONDS)).containsExactly("hello", "stef");
}
}

@Test
public void testMultiFromSse() {
testMulti("sse");
}

@Test
public void testMultiFromMulti() {
testMulti("sse/multi");
}

private void testMulti(String path) {
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(uri.toString() + path);
Multi<String> multi = target.request().rx(MultiInvoker.class).get(String.class);
List<String> list = multi.collect().asList().await().atMost(Duration.ofSeconds(30));
assertThat(list).containsExactly("hello", "stef");
}

@Test
public void testJsonMultiFromSse() {
testJsonMulti("sse/json");
testJsonMulti("sse/json2");
testJsonMulti("sse/blocking/json");
}

@Test
public void testJsonMultiFromMulti() {
testJsonMulti("sse/json/multi");
}

@Test
public void testJsonMultiFromMultiWithDefaultElementType() {
testJsonMulti("sse/json/multi2");
}

@Test
public void testNdJsonMultiFromMulti() {
when().get(uri.toString() + "sse/ndjson/multi")
.then().statusCode(HttpStatus.SC_OK)
// @formatter:off
.body(is("{\"name\":\"hello\"}/n"
+ "{\"name\":\"stef\"}/n"))
// @formatter:on
.header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_NDJSON));
}

@Test
public void testStreamJsonMultiFromMulti() {
when().get(uri.toString() + "sse/stream-json/multi")
.then().statusCode(HttpStatus.SC_OK)
// @formatter:off
.body(is("{\"name\":\"hello\"}/n"
+ "{\"name\":\"stef\"}/n"))
// @formatter:on
.header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_STREAM_JSON));
}

private void testJsonMulti(String path) {
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(uri.toString() + path);
Multi<Message> multi = target.request().rx(MultiInvoker.class).get(Message.class);
List<Message> list = multi.collect().asList().await().atMost(Duration.ofSeconds(30));
assertThat(list).extracting("name").containsExactly("hello", "stef");
}
}
Loading