From 6648510d38f0dcb728836292157ae94aa5c79780 Mon Sep 17 00:00:00 2001 From: Jose Date: Thu, 20 Jan 2022 07:41:50 +0100 Subject: [PATCH 1/2] Add JSON streaming for RESTEasy Reactive Jsonb and Jackson We now support json streaming. Example: ``` @Path("stream-json/multi") @GET @Produces(SseMediaType.APPLICATION_STREAM_JSON) @RestSseElementType(MediaType.APPLICATION_JSON) public Multi multiStreamJson() { return Multi.createFrom().items(new Message("hello"), new Message("stef")); } ``` We support "application/x-ndjson" and "application/stream+json". Moreover, I've added a new utility class called `SseMediaType` to be used by users. The implementation is similar to `MediaType`. --- .../deployment/pom.xml | 10 ++ .../ResteasyReactiveJacksonProcessor.java | 10 +- .../jackson/deployment/test/sse/Message.java | 13 ++ .../deployment/test/sse/SseResource.java | 125 +++++++++++++++ .../deployment/test/sse/SseTestCase.java | 146 ++++++++++++++++++ .../ResteasyReactiveJsonbProcessor.java | 3 +- .../deployment/test/sse/SseResource.java | 17 ++ .../deployment/test/sse/SseTestCase.java | 28 ++++ .../reactive/common/util/RestMediaType.java | 18 +++ .../handlers/PublisherResponseHandler.java | 87 +++++++++-- 10 files changed, 439 insertions(+), 18 deletions(-) create mode 100644 extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/Message.java create mode 100644 extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseResource.java create mode 100644 extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseTestCase.java create mode 100644 independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/util/RestMediaType.java diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/pom.xml b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/pom.xml index d89c9b3b3e4c2..87da39c8b69fc 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/pom.xml +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/pom.xml @@ -40,6 +40,16 @@ quarkus-hibernate-validator-deployment test + + org.assertj + assertj-core + test + + + io.quarkus + quarkus-jaxrs-client-reactive-deployment + test + diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java index 2df5d371d6ccd..4aeaac1a2d00e 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/main/java/io/quarkus/resteasy/reactive/jackson/deployment/processor/ResteasyReactiveJacksonProcessor.java @@ -23,6 +23,7 @@ import org.jboss.logging.Logger; import org.jboss.resteasy.reactive.common.model.ResourceMethod; import org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames; +import org.jboss.resteasy.reactive.common.util.RestMediaType; import org.jboss.resteasy.reactive.server.util.MethodId; import com.fasterxml.jackson.annotation.JsonView; @@ -137,15 +138,18 @@ void additionalProviders(List jacksonFeatureBuildItems, additionalWriters .produce(new MessageBodyWriterBuildItem(getJacksonMessageBodyWriter(applicationNeedsSpecialJacksonFeatures), Object.class.getName(), - Collections.singletonList(MediaType.APPLICATION_JSON))); + List.of(MediaType.APPLICATION_JSON, RestMediaType.APPLICATION_NDJSON, + RestMediaType.APPLICATION_STREAM_JSON))); additionalWriters .produce(new MessageBodyWriterBuildItem(VertxJsonArrayMessageBodyWriter.class.getName(), JsonArray.class.getName(), - Collections.singletonList(MediaType.APPLICATION_JSON))); + List.of(MediaType.APPLICATION_JSON, RestMediaType.APPLICATION_NDJSON, + RestMediaType.APPLICATION_STREAM_JSON))); additionalWriters .produce(new MessageBodyWriterBuildItem(VertxJsonObjectMessageBodyWriter.class.getName(), JsonObject.class.getName(), - Collections.singletonList(MediaType.APPLICATION_JSON))); + List.of(MediaType.APPLICATION_JSON, RestMediaType.APPLICATION_NDJSON, + RestMediaType.APPLICATION_STREAM_JSON))); } private String getJacksonMessageBodyWriter(boolean applicationNeedsSpecialJacksonFeatures) { diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/Message.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/Message.java new file mode 100644 index 0000000000000..c09c92acb9672 --- /dev/null +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/Message.java @@ -0,0 +1,13 @@ +package io.quarkus.resteasy.reactive.jackson.deployment.test.sse; + +public class Message { + public String name; + + public Message(String name) { + this.name = name; + } + + // for Jsonb + public Message() { + } +} diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseResource.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseResource.java new file mode 100644 index 0000000000000..f1622e35051a3 --- /dev/null +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseResource.java @@ -0,0 +1,125 @@ +package io.quarkus.resteasy.reactive.jackson.deployment.test.sse; + +import java.io.IOException; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.sse.Sse; +import javax.ws.rs.sse.SseBroadcaster; +import javax.ws.rs.sse.SseEventSink; + +import org.jboss.resteasy.reactive.RestSseElementType; +import org.jboss.resteasy.reactive.common.util.RestMediaType; + +import io.smallrye.common.annotation.Blocking; +import io.smallrye.mutiny.Multi; + +@Path("sse") +public class SseResource { + + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + public void sse(Sse sse, SseEventSink sink) { + if (sink == null) { + throw new IllegalStateException("No client connected."); + } + SseBroadcaster sseBroadcaster = sse.newBroadcaster(); + + sseBroadcaster.register(sink); + sseBroadcaster.broadcast(sse.newEventBuilder().data("hello").build()) + .thenCompose(v -> sseBroadcaster.broadcast(sse.newEventBuilder().data("stef").build())) + .thenAccept(v -> sseBroadcaster.close()); + } + + @Path("multi") + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + public Multi multiText() { + return Multi.createFrom().items("hello", "stef"); + } + + @Path("json") + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + @RestSseElementType(MediaType.APPLICATION_JSON) + public void sseJson(Sse sse, SseEventSink sink) throws IOException { + if (sink == null) { + throw new IllegalStateException("No client connected."); + } + SseBroadcaster sseBroadcaster = sse.newBroadcaster(); + + sseBroadcaster.register(sink); + sseBroadcaster.broadcast(sse.newEventBuilder().data(new Message("hello")).build()) + .thenCompose(v -> sseBroadcaster.broadcast(sse.newEventBuilder().data(new Message("stef")).build())) + .thenAccept(v -> sseBroadcaster.close()); + } + + @Blocking + @Path("blocking/json") + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + @RestSseElementType(MediaType.APPLICATION_JSON) + public void blockingSseJson(Sse sse, SseEventSink sink) throws IOException { + if (sink == null) { + throw new IllegalStateException("No client connected."); + } + SseBroadcaster sseBroadcaster = sse.newBroadcaster(); + + sseBroadcaster.register(sink); + sseBroadcaster.broadcast(sse.newEventBuilder().data(new Message("hello")).build()) + .thenCompose(v -> sseBroadcaster.broadcast(sse.newEventBuilder().data(new Message("stef")).build())) + .thenAccept(v -> sseBroadcaster.close()); + } + + @Path("json2") + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + public void sseJson2(Sse sse, SseEventSink sink) throws IOException { + if (sink == null) { + throw new IllegalStateException("No client connected."); + } + SseBroadcaster sseBroadcaster = sse.newBroadcaster(); + + // Same as sseJson but set mediaType in builder + sseBroadcaster.register(sink); + sseBroadcaster + .broadcast(sse.newEventBuilder().data(new Message("hello")).mediaType(MediaType.APPLICATION_JSON_TYPE).build()) + .thenCompose(v -> sseBroadcaster.broadcast( + sse.newEventBuilder().mediaType(MediaType.APPLICATION_JSON_TYPE).data(new Message("stef")).build())) + .thenAccept(v -> sseBroadcaster.close()); + } + + @Path("json/multi") + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + @RestSseElementType(MediaType.APPLICATION_JSON) + public Multi multiJson() { + return Multi.createFrom().items(new Message("hello"), new Message("stef")); + } + + @Path("json/multi2") + @GET + @Produces(MediaType.SERVER_SENT_EVENTS) + public Multi multiDefaultElementType() { + return Multi.createFrom().items(new Message("hello"), new Message("stef")); + } + + @Path("ndjson/multi") + @GET + @Produces(RestMediaType.APPLICATION_NDJSON) + @RestSseElementType(MediaType.APPLICATION_JSON) + public Multi multiNdJson() { + return Multi.createFrom().items(new Message("hello"), new Message("stef")); + } + + @Path("stream-json/multi") + @GET + @Produces(RestMediaType.APPLICATION_STREAM_JSON) + @RestSseElementType(MediaType.APPLICATION_JSON) + public Multi multiStreamJson() { + return Multi.createFrom().items(new Message("hello"), new Message("stef")); + } + +} diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseTestCase.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseTestCase.java new file mode 100644 index 0000000000000..1f29b42de959c --- /dev/null +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseTestCase.java @@ -0,0 +1,146 @@ +package io.quarkus.resteasy.reactive.jackson.deployment.test.sse; + +import static io.restassured.RestAssured.when; +import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; + +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.sse.InboundSseEvent; +import javax.ws.rs.sse.SseEventSource; + +import org.apache.http.HttpStatus; +import org.jboss.resteasy.reactive.client.impl.MultiInvoker; +import org.jboss.resteasy.reactive.common.util.RestMediaType; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.smallrye.mutiny.Multi; + +public class SseTestCase { + + @TestHTTPResource + URI uri; + + @RegisterExtension + static final QuarkusUnitTest config = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class) + .addClasses(SseResource.class, Message.class)); + + @Test + public void testSseFromSse() throws Exception { + testSse("sse"); + } + + @Test + public void testSseFromMulti() throws Exception { + testSse("sse/multi"); + } + + private void testSse(String path) throws Exception { + Client client = ClientBuilder.newBuilder().build(); + WebTarget target = client.target(uri.toString() + path); + // do not reconnect + try (SseEventSource eventSource = SseEventSource.target(target).reconnectingEvery(Integer.MAX_VALUE, TimeUnit.SECONDS) + .build()) { + CompletableFuture> res = new CompletableFuture<>(); + List collect = Collections.synchronizedList(new ArrayList<>()); + eventSource.register(new Consumer() { + @Override + public void accept(InboundSseEvent inboundSseEvent) { + collect.add(inboundSseEvent.readData()); + } + }, new Consumer() { + @Override + public void accept(Throwable throwable) { + res.completeExceptionally(throwable); + } + }, () -> { + res.complete(collect); + }); + eventSource.open(); + assertThat(res.get(5, TimeUnit.SECONDS)).containsExactly("hello", "stef"); + } + } + + @Test + public void testMultiFromSse() { + testMulti("sse"); + } + + @Test + public void testMultiFromMulti() { + testMulti("sse/multi"); + } + + private void testMulti(String path) { + Client client = ClientBuilder.newBuilder().build(); + WebTarget target = client.target(uri.toString() + path); + Multi multi = target.request().rx(MultiInvoker.class).get(String.class); + List list = multi.collect().asList().await().atMost(Duration.ofSeconds(30)); + assertThat(list).containsExactly("hello", "stef"); + } + + @Test + public void testJsonMultiFromSse() { + testJsonMulti("sse/json"); + testJsonMulti("sse/json2"); + testJsonMulti("sse/blocking/json"); + } + + @Test + public void testJsonMultiFromMulti() { + testJsonMulti("sse/json/multi"); + } + + @Test + public void testJsonMultiFromMultiWithDefaultElementType() { + testJsonMulti("sse/json/multi2"); + } + + @Test + public void testNdJsonMultiFromMulti() { + when().get(uri.toString() + "sse/ndjson/multi") + .then().statusCode(HttpStatus.SC_OK) + // @formatter:off + .body(is("{\"name\":\"hello\"}/n" + + "{\"name\":\"stef\"}/n")) + // @formatter:on + .header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_NDJSON)); + } + + @Test + public void testStreamJsonMultiFromMulti() { + when().get(uri.toString() + "sse/stream-json/multi") + .then().statusCode(HttpStatus.SC_OK) + // @formatter:off + .body(is("{\"name\":\"hello\"}/n" + + "{\"name\":\"stef\"}/n")) + // @formatter:on + .header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_STREAM_JSON)); + } + + private void testJsonMulti(String path) { + Client client = ClientBuilder.newBuilder().build(); + WebTarget target = client.target(uri.toString() + path); + Multi multi = target.request().rx(MultiInvoker.class).get(Message.class); + List list = multi.collect().asList().await().atMost(Duration.ofSeconds(30)); + assertThat(list).extracting("name").containsExactly("hello", "stef"); + } +} diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/main/java/io/quarkus/resteasy/reactive/jsonb/deployment/processor/ResteasyReactiveJsonbProcessor.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/main/java/io/quarkus/resteasy/reactive/jsonb/deployment/processor/ResteasyReactiveJsonbProcessor.java index 44d18f62cdbc7..d9af2a611b97b 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/main/java/io/quarkus/resteasy/reactive/jsonb/deployment/processor/ResteasyReactiveJsonbProcessor.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/main/java/io/quarkus/resteasy/reactive/jsonb/deployment/processor/ResteasyReactiveJsonbProcessor.java @@ -5,6 +5,7 @@ import javax.ws.rs.core.MediaType; +import org.jboss.resteasy.reactive.common.util.RestMediaType; import org.jboss.resteasy.reactive.server.jsonb.JsonbMessageBodyReader; import org.jboss.resteasy.reactive.server.jsonb.JsonbMessageBodyWriter; @@ -53,7 +54,7 @@ void additionalProviders(BuildProducer additionalBean, additionalReaders.produce(new MessageBodyReaderBuildItem(JsonbMessageBodyReader.class.getName(), Object.class.getName(), Collections.singletonList(MediaType.APPLICATION_JSON))); additionalWriters.produce(new MessageBodyWriterBuildItem(JsonbMessageBodyWriter.class.getName(), Object.class.getName(), - Collections.singletonList(MediaType.APPLICATION_JSON))); + List.of(MediaType.APPLICATION_JSON, RestMediaType.APPLICATION_NDJSON, RestMediaType.APPLICATION_STREAM_JSON))); } @BuildStep diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseResource.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseResource.java index 3a3d4619a81d0..eff18d0e526d7 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseResource.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseResource.java @@ -11,6 +11,7 @@ import javax.ws.rs.sse.SseEventSink; import org.jboss.resteasy.reactive.RestSseElementType; +import org.jboss.resteasy.reactive.common.util.RestMediaType; import io.smallrye.common.annotation.Blocking; import io.smallrye.mutiny.Multi; @@ -105,4 +106,20 @@ public Multi multiDefaultElementType() { return Multi.createFrom().items(new Message("hello"), new Message("stef")); } + @Path("ndjson/multi") + @GET + @Produces(RestMediaType.APPLICATION_NDJSON) + @RestSseElementType(MediaType.APPLICATION_JSON) + public Multi multiNdJson() { + return Multi.createFrom().items(new Message("hello"), new Message("stef")); + } + + @Path("stream-json/multi") + @GET + @Produces(RestMediaType.APPLICATION_STREAM_JSON) + @RestSseElementType(MediaType.APPLICATION_JSON) + public Multi multiStreamJson() { + return Multi.createFrom().items(new Message("hello"), new Message("stef")); + } + } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseTestCase.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseTestCase.java index 31df8d75355c6..9e00b014c544e 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseTestCase.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseTestCase.java @@ -1,6 +1,9 @@ package io.quarkus.resteasy.reactive.jsonb.deployment.test.sse; +import static io.restassured.RestAssured.when; import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; import java.net.URI; import java.time.Duration; @@ -14,10 +17,13 @@ import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.sse.InboundSseEvent; import javax.ws.rs.sse.SseEventSource; +import org.apache.http.HttpStatus; import org.jboss.resteasy.reactive.client.impl.MultiInvoker; +import org.jboss.resteasy.reactive.common.util.RestMediaType; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -106,6 +112,28 @@ public void testJsonMultiFromMultiWithDefaultElementType() { testJsonMulti("sse/json/multi2"); } + @Test + public void testNdJsonMultiFromMulti() { + when().get(uri.toString() + "sse/ndjson/multi") + .then().statusCode(HttpStatus.SC_OK) + // @formatter:off + .body(is("{\"name\":\"hello\"}/n" + + "{\"name\":\"stef\"}/n")) + // @formatter:on + .header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_NDJSON)); + } + + @Test + public void testStreamJsonMultiFromMulti() { + when().get(uri.toString() + "sse/stream-json/multi") + .then().statusCode(HttpStatus.SC_OK) + // @formatter:off + .body(is("{\"name\":\"hello\"}/n" + + "{\"name\":\"stef\"}/n")) + // @formatter:on + .header(HttpHeaders.CONTENT_TYPE, containsString(RestMediaType.APPLICATION_STREAM_JSON)); + } + private void testJsonMulti(String path) { Client client = ClientBuilder.newBuilder().build(); WebTarget target = client.target(uri.toString() + path); diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/util/RestMediaType.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/util/RestMediaType.java new file mode 100644 index 0000000000000..f2a034bb46558 --- /dev/null +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/util/RestMediaType.java @@ -0,0 +1,18 @@ +package org.jboss.resteasy.reactive.common.util; + +import javax.ws.rs.core.MediaType; + +/** + * Extended media types in Resteasy Reactive. + */ +public final class RestMediaType { + + public static final String APPLICATION_NDJSON = "application/x-ndjson"; + public static final MediaType APPLICATION_NDJSON_TYPE = new MediaType("application", "x-ndjson"); + public static final String APPLICATION_STREAM_JSON = "application/stream+json"; + public static final MediaType APPLICATION_STREAM_JSON_TYPE = new MediaType("application", "stream+json"); + + private RestMediaType() { + + } +} diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java index aa474d8ec2cff..43ddb8b91cf78 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/handlers/PublisherResponseHandler.java @@ -7,6 +7,7 @@ import java.util.function.BiFunction; import javax.ws.rs.core.MediaType; import org.jboss.logging.Logger; +import org.jboss.resteasy.reactive.common.util.RestMediaType; import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext; import org.jboss.resteasy.reactive.server.core.SseUtil; import org.jboss.resteasy.reactive.server.core.StreamingUtil; @@ -25,6 +26,8 @@ */ public class PublisherResponseHandler implements ServerRestHandler { + private static final String JSON = "json"; + private List streamingResponseCustomizers = Collections.emptyList(); public void setStreamingResponseCustomizers(List streamingResponseCustomizers) { @@ -57,6 +60,35 @@ public Object apply(Object v, Throwable t) { } } + private static class ChunkedStreamingMultiSubscriber extends StreamingMultiSubscriber { + + private static final String LINE_SEPARATOR = "/n"; + + private boolean isFirstItem = true; + + ChunkedStreamingMultiSubscriber(ResteasyReactiveRequestContext requestContext, + List customizers, boolean json) { + super(requestContext, customizers, json); + } + + @Override + protected String messagePrefix() { + // When message is chunked, we don't need to add prefixes at first + if (isFirstItem) { + isFirstItem = false; + return null; + } + + // If it's not the first message, we need to append the messages with end of line delimiter. + return LINE_SEPARATOR; + } + + @Override + protected String onCompleteText() { + return LINE_SEPARATOR; + } + } + private static class StreamingMultiSubscriber extends AbstractMultiSubscriber { // Huge hack to stream valid json @@ -75,7 +107,7 @@ private static class StreamingMultiSubscriber extends AbstractMultiSubscriber { @Override public void onNext(Object item) { hadItem = true; - StreamingUtil.send(requestContext, customizers, item, json ? nextJsonPrefix : null) + StreamingUtil.send(requestContext, customizers, item, messagePrefix()) .handle(new BiFunction() { @Override public Object apply(Object v, Throwable t) { @@ -104,13 +136,7 @@ public void onComplete() { StreamingUtil.setHeaders(requestContext, requestContext.serverResponse(), customizers); } if (json) { - String postfix; - // check if we never sent the open prefix - if (!hadItem) { - postfix = "[]"; - } else { - postfix = "]"; - } + String postfix = onCompleteText(); byte[] postfixBytes = postfix.getBytes(StandardCharsets.US_ASCII); requestContext.serverResponse().write(postfixBytes).handle((v, t) -> { super.onComplete(); @@ -120,6 +146,23 @@ public void onComplete() { super.onComplete(); } } + + protected String onCompleteText() { + String postfix; + // check if we never sent the open prefix + if (!hadItem) { + postfix = "[]"; + } else { + postfix = "]"; + } + + return postfix; + } + + protected String messagePrefix() { + // if it's json, the message prefix starts with `[`. + return json ? nextJsonPrefix : null; + } } static abstract class AbstractMultiSubscriber implements Subscriber { @@ -197,24 +240,40 @@ public void handle(ResteasyReactiveRequestContext requestContext) throws Excepti // media type negotiation and fixed entity writer set up, perhaps it's better than // cancelling the normal route? // or make this SSE produce build-time - MediaType[] mediaTypes = requestContext.getTarget().getProduces().getSortedMediaTypes(); - if (mediaTypes.length != 1) + MediaType[] mediaTypes = requestContext.getTarget().getProduces().getSortedOriginalMediaTypes(); + if (mediaTypes.length != 1) { throw new IllegalStateException( "Negotiation or dynamic media type not supported yet for Multi: please use a single @Produces annotation"); - requestContext.setResponseContentType(mediaTypes[0]); + } + + MediaType mediaType = mediaTypes[0]; + requestContext.setResponseContentType(mediaType); // this is the non-async return type requestContext.setGenericReturnType(requestContext.getTarget().getReturnType()); // we have several possibilities here, but in all we suspend requestContext.suspend(); - if (mediaTypes[0].isCompatible(MediaType.SERVER_SENT_EVENTS_TYPE)) { + if (mediaType.isCompatible(MediaType.SERVER_SENT_EVENTS_TYPE)) { handleSse(requestContext, result); } else { - boolean json = mediaTypes[0].isCompatible(MediaType.APPLICATION_JSON_TYPE); - handleStreaming(requestContext, result, json); + boolean json = mediaType.toString().contains(JSON); + if (requiresChunkedStream(mediaType)) { + handleChunkedStreaming(requestContext, result, json); + } else { + handleStreaming(requestContext, result, json); + } } } } + private boolean requiresChunkedStream(MediaType mediaType) { + return mediaType.isCompatible(RestMediaType.APPLICATION_NDJSON_TYPE) + || mediaType.isCompatible(RestMediaType.APPLICATION_STREAM_JSON_TYPE); + } + + private void handleChunkedStreaming(ResteasyReactiveRequestContext requestContext, Publisher result, boolean json) { + result.subscribe(new ChunkedStreamingMultiSubscriber(requestContext, streamingResponseCustomizers, json)); + } + private void handleStreaming(ResteasyReactiveRequestContext requestContext, Publisher result, boolean json) { result.subscribe(new StreamingMultiSubscriber(requestContext, streamingResponseCustomizers, json)); } From d684ebe6f152b2f31be8dfdb7b1996f475f8028f Mon Sep 17 00:00:00 2001 From: Jose Date: Thu, 20 Jan 2022 12:07:43 +0100 Subject: [PATCH 2/2] Deprecate @RestSseElementType in favour of @RestStreamElementType --- .../asciidoc/kafka-schema-registry-avro.adoc | 4 +- docs/src/main/asciidoc/resteasy-reactive.adoc | 6 +- .../deployment/test/sse/SseResource.java | 12 ++-- .../jaxb/deployment/test/SseResourceTest.java | 6 +- .../deployment/test/sse/SseResource.java | 1 + .../QuarkusServerEndpointIndexer.java | 2 +- .../common/processor/EndpointIndexer.java | 55 ++++++++++++------- .../processor/ResteasyReactiveDotNames.java | 2 + .../resteasy/reactive/RestSseElementType.java | 3 + .../reactive/RestStreamElementType.java | 19 +++++++ .../reactive/common/model/ResourceMethod.java | 18 +++--- .../reactive/common/util/RestMediaType.java | 10 ++-- .../reactive/server/core/SseUtil.java | 6 +- .../startup/RuntimeResourceDeployment.java | 9 +-- .../server/mapping/RuntimeResource.java | 12 ++-- .../server/model/ServerResourceMethod.java | 4 +- 16 files changed, 106 insertions(+), 63 deletions(-) create mode 100644 independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestStreamElementType.java diff --git a/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc b/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc index 1521fe89a7dca..1a65fcf651617 100644 --- a/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc +++ b/docs/src/main/asciidoc/kafka-schema-registry-avro.adoc @@ -229,7 +229,7 @@ import javax.ws.rs.core.MediaType; import org.acme.kafka.quarkus.Movie; import org.eclipse.microprofile.reactive.messaging.Channel; -import org.jboss.resteasy.reactive.RestSseElementType; +import org.jboss.resteasy.reactive.RestStreamElementType; import io.smallrye.mutiny.Multi; @@ -242,7 +242,7 @@ public class ConsumedMovieResource { @GET @Produces(MediaType.SERVER_SENT_EVENTS) - @RestSseElementType(MediaType.TEXT_PLAIN) + @RestStreamElementType(MediaType.TEXT_PLAIN) public Multi stream() { return movies.map(movie -> String.format("'%s' from %s", movie.getTitle(), movie.getYear())); } diff --git a/docs/src/main/asciidoc/resteasy-reactive.adoc b/docs/src/main/asciidoc/resteasy-reactive.adoc index f1c60c4e0de71..1e6ff66dcdd12 100644 --- a/docs/src/main/asciidoc/resteasy-reactive.adoc +++ b/docs/src/main/asciidoc/resteasy-reactive.adoc @@ -708,7 +708,7 @@ https://html.spec.whatwg.org/multipage/server-sent-events.html[Server-Sent Event by just annotating your endpoint method with link:{jaxrsapi}/javax/ws/rs/Produces.html[`@Produces(MediaType.SERVER_SENT_EVENTS)`] and specifying that each element should be <> with -`@RestSseElementType(MediaType.APPLICATION_JSON)`. +`@RestStreamElementType(MediaType.APPLICATION_JSON)`. [source,java] ---- @@ -720,7 +720,7 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; -import org.jboss.resteasy.reactive.RestSseElementType; +import org.jboss.resteasy.reactive.RestStreamElementType; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; @@ -739,7 +739,7 @@ public class Endpoint { // Send the stream over SSE @Produces(MediaType.SERVER_SENT_EVENTS) // Each element will be sent as JSON - @RestSseElementType(MediaType.APPLICATION_JSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) public Multi stream() { return books; } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseResource.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseResource.java index f1622e35051a3..c182a2f6281e3 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseResource.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jackson/deployment/src/test/java/io/quarkus/resteasy/reactive/jackson/deployment/test/sse/SseResource.java @@ -10,7 +10,7 @@ import javax.ws.rs.sse.SseBroadcaster; import javax.ws.rs.sse.SseEventSink; -import org.jboss.resteasy.reactive.RestSseElementType; +import org.jboss.resteasy.reactive.RestStreamElementType; import org.jboss.resteasy.reactive.common.util.RestMediaType; import io.smallrye.common.annotation.Blocking; @@ -43,7 +43,7 @@ public Multi multiText() { @Path("json") @GET @Produces(MediaType.SERVER_SENT_EVENTS) - @RestSseElementType(MediaType.APPLICATION_JSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) public void sseJson(Sse sse, SseEventSink sink) throws IOException { if (sink == null) { throw new IllegalStateException("No client connected."); @@ -60,7 +60,7 @@ public void sseJson(Sse sse, SseEventSink sink) throws IOException { @Path("blocking/json") @GET @Produces(MediaType.SERVER_SENT_EVENTS) - @RestSseElementType(MediaType.APPLICATION_JSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) public void blockingSseJson(Sse sse, SseEventSink sink) throws IOException { if (sink == null) { throw new IllegalStateException("No client connected."); @@ -94,7 +94,7 @@ public void sseJson2(Sse sse, SseEventSink sink) throws IOException { @Path("json/multi") @GET @Produces(MediaType.SERVER_SENT_EVENTS) - @RestSseElementType(MediaType.APPLICATION_JSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) public Multi multiJson() { return Multi.createFrom().items(new Message("hello"), new Message("stef")); } @@ -109,7 +109,7 @@ public Multi multiDefaultElementType() { @Path("ndjson/multi") @GET @Produces(RestMediaType.APPLICATION_NDJSON) - @RestSseElementType(MediaType.APPLICATION_JSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) public Multi multiNdJson() { return Multi.createFrom().items(new Message("hello"), new Message("stef")); } @@ -117,7 +117,7 @@ public Multi multiNdJson() { @Path("stream-json/multi") @GET @Produces(RestMediaType.APPLICATION_STREAM_JSON) - @RestSseElementType(MediaType.APPLICATION_JSON) + @RestStreamElementType(MediaType.APPLICATION_JSON) public Multi multiStreamJson() { return Multi.createFrom().items(new Message("hello"), new Message("stef")); } diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jaxb/deployment/src/test/java/io/quarkus/resteasy/reactive/jaxb/deployment/test/SseResourceTest.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jaxb/deployment/src/test/java/io/quarkus/resteasy/reactive/jaxb/deployment/test/SseResourceTest.java index c06e49f2305aa..50f0d61b1731f 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jaxb/deployment/src/test/java/io/quarkus/resteasy/reactive/jaxb/deployment/test/SseResourceTest.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jaxb/deployment/src/test/java/io/quarkus/resteasy/reactive/jaxb/deployment/test/SseResourceTest.java @@ -26,7 +26,7 @@ import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; -import org.jboss.resteasy.reactive.RestSseElementType; +import org.jboss.resteasy.reactive.RestStreamElementType; import org.jboss.resteasy.reactive.client.impl.MultiInvoker; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -154,7 +154,7 @@ public Multi multiText() { @Path("xml") @GET @Produces(MediaType.SERVER_SENT_EVENTS) - @RestSseElementType(MediaType.APPLICATION_XML) + @RestStreamElementType(MediaType.APPLICATION_XML) public void sseXml(Sse sse, SseEventSink sink) { if (sink == null) { throw new IllegalStateException("No client connected."); @@ -170,7 +170,7 @@ public void sseXml(Sse sse, SseEventSink sink) { @Path("blocking/xml") @GET @Produces(MediaType.SERVER_SENT_EVENTS) - @RestSseElementType(MediaType.APPLICATION_XML) + @RestStreamElementType(MediaType.APPLICATION_XML) public void blockingSseXml(Sse sse, SseEventSink sink) { if (sink == null) { throw new IllegalStateException("No client connected."); diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseResource.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseResource.java index eff18d0e526d7..83a02a7901269 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseResource.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive-jsonb/deployment/src/test/java/io/quarkus/resteasy/reactive/jsonb/deployment/test/sse/SseResource.java @@ -10,6 +10,7 @@ import javax.ws.rs.sse.SseBroadcaster; import javax.ws.rs.sse.SseEventSink; +// Using `@RestStreamElementType` on purpose to ensure the backward compatibility. import org.jboss.resteasy.reactive.RestSseElementType; import org.jboss.resteasy.reactive.common.util.RestMediaType; diff --git a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/main/java/io/quarkus/resteasy/reactive/server/deployment/QuarkusServerEndpointIndexer.java b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/main/java/io/quarkus/resteasy/reactive/server/deployment/QuarkusServerEndpointIndexer.java index f74a26f9c1634..8e60f903255ff 100644 --- a/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/main/java/io/quarkus/resteasy/reactive/server/deployment/QuarkusServerEndpointIndexer.java +++ b/extensions/resteasy-reactive/quarkus-resteasy-reactive/deployment/src/main/java/io/quarkus/resteasy/reactive/server/deployment/QuarkusServerEndpointIndexer.java @@ -182,7 +182,7 @@ private boolean isDefaultJson() { } private boolean hasJson(ServerResourceMethod method) { - return hasJson(method.getProduces()) || hasJson(method.getConsumes()) || isJson(method.getSseElementType()); + return hasJson(method.getProduces()) || hasJson(method.getConsumes()) || isJson(method.getStreamElementType()); } private boolean hasJson(String[] types) { diff --git a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/EndpointIndexer.java b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/EndpointIndexer.java index 9fd183e22c1bd..9557c5d9c9909 100644 --- a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/EndpointIndexer.java +++ b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/EndpointIndexer.java @@ -49,6 +49,7 @@ import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_QUERY_PARAM; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_RESPONSE; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_SSE_ELEMENT_TYPE; +import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.REST_STREAM_ELEMENT_TYPE; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.SET; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.SORTED_SET; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.STRING; @@ -329,15 +330,10 @@ protected List createEndpoints(ClassInfo currentClassInfo, String[] classProduces = extractProducesConsumesValues(getAnnotationStore().getAnnotation(currentClassInfo, PRODUCES)); String[] classConsumes = extractProducesConsumesValues(getAnnotationStore().getAnnotation(currentClassInfo, CONSUMES)); - String classSseElementType = null; - AnnotationInstance classSseElementTypeAnnotation = getAnnotationStore().getAnnotation(currentClassInfo, - REST_SSE_ELEMENT_TYPE); - if (classSseElementTypeAnnotation != null) { - classSseElementType = classSseElementTypeAnnotation.value().asString(); - } + String classStreamElementType = getStreamAnnotationValue(currentClassInfo); BasicResourceClassInfo basicResourceClassInfo = new BasicResourceClassInfo(resourceClassPath, classProduces, - classConsumes, pathParameters, classSseElementType); + classConsumes, pathParameters, classStreamElementType); Set classNameBindings = NameBindingUtil.nameBindingNames(index, currentClassInfo); @@ -564,19 +560,19 @@ private ResourceMethod createResourceMethod(ClassInfo currentClassInfo, ClassInf produces = applyDefaultProduces(produces, nonAsyncReturnType); produces = addDefaultCharsets(produces); - String sseElementType = basicResourceClassInfo.getSseElementType(); - AnnotationInstance sseElementTypeAnnotation = getAnnotationStore().getAnnotation(currentMethodInfo, - REST_SSE_ELEMENT_TYPE); - if (sseElementTypeAnnotation != null) { - sseElementType = sseElementTypeAnnotation.value().asString(); + String streamElementType = basicResourceClassInfo.getStreamElementType(); + String streamElementTypeInMethod = getStreamAnnotationValue(currentMethodInfo); + if (streamElementTypeInMethod != null) { + streamElementType = streamElementTypeInMethod; } + boolean returnsMultipart = false; if (produces != null && produces.length == 1) { - if (sseElementType == null && MediaType.SERVER_SENT_EVENTS.equals(produces[0])) { + if (streamElementType == null && MediaType.SERVER_SENT_EVENTS.equals(produces[0])) { // Handle server sent events responses String[] defaultProducesForType = applyAdditionalDefaults(nonAsyncReturnType); if (defaultProducesForType.length == 1) { - sseElementType = defaultProducesForType[0]; + streamElementType = defaultProducesForType[0]; } } else if (MediaType.MULTIPART_FORM_DATA.equals(produces[0])) { // Handle multipart form data responses @@ -615,7 +611,7 @@ private ResourceMethod createResourceMethod(ClassInfo currentClassInfo, ClassInf .setBlocking(blocking) .setSuspended(suspended) .setSse(sse) - .setSseElementType(sseElementType) + .setStreamElementType(streamElementType) .setFormParamRequired(formParamRequired) .setMultipart(multipart) .setParameters(methodParameters) @@ -646,6 +642,25 @@ protected void handleClientSubResource(ResourceMethod resourceMethod, MethodInfo } + private String getStreamAnnotationValue(AnnotationTarget target) { + String value = getAnnotationValueAsString(target, REST_STREAM_ELEMENT_TYPE); + if (value == null) { + value = getAnnotationValueAsString(target, REST_SSE_ELEMENT_TYPE); + } + + return value; + } + + private String getAnnotationValueAsString(AnnotationTarget target, DotName annotationType) { + String value = null; + AnnotationInstance annotation = getAnnotationStore().getAnnotation(target, annotationType); + if (annotation != null) { + value = annotation.value().asString(); + } + + return value; + } + private boolean isBlocking(MethodInfo info, BlockingDefault defaultValue) { Map.Entry blockingAnnotation = getInheritableAnnotation(info, BLOCKING); Map.Entry nonBlockingAnnotation = getInheritableAnnotation(info, @@ -1375,15 +1390,15 @@ public static class BasicResourceClassInfo { private final String[] produces; private final String[] consumes; private final Set pathParameters; - private final String sseElementType; + private final String streamElementType; public BasicResourceClassInfo(String path, String[] produces, String[] consumes, Set pathParameters, - String sseElementType) { + String streamElementType) { this.path = path; this.produces = produces; this.consumes = consumes; this.pathParameters = pathParameters; - this.sseElementType = sseElementType; + this.streamElementType = streamElementType; } public String getPath() { @@ -1402,8 +1417,8 @@ public Set getPathParameters() { return pathParameters; } - public String getSseElementType() { - return sseElementType; + public String getStreamElementType() { + return streamElementType; } } diff --git a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java index 612d273e2cded..2d37bddb44ead 100644 --- a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java +++ b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/ResteasyReactiveDotNames.java @@ -91,6 +91,7 @@ import org.jboss.resteasy.reactive.RestQuery; import org.jboss.resteasy.reactive.RestResponse; import org.jboss.resteasy.reactive.RestSseElementType; +import org.jboss.resteasy.reactive.RestStreamElementType; import org.reactivestreams.Publisher; public final class ResteasyReactiveDotNames { @@ -111,6 +112,7 @@ public final class ResteasyReactiveDotNames { .createSimple("org.jboss.resteasy.reactive.server.spi.ServerRequestContext"); public static final DotName REST_SSE_ELEMENT_TYPE = DotName.createSimple(RestSseElementType.class.getName()); + public static final DotName REST_STREAM_ELEMENT_TYPE = DotName.createSimple(RestStreamElementType.class.getName()); public static final DotName CONSUMES = DotName.createSimple(Consumes.class.getName()); public static final DotName PRODUCES = DotName.createSimple(Produces.class.getName()); public static final DotName PROVIDER = DotName.createSimple(Provider.class.getName()); diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestSseElementType.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestSseElementType.java index b6d8cf5e197d4..996cad82bbe82 100644 --- a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestSseElementType.java +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestSseElementType.java @@ -9,11 +9,14 @@ /** * Defines the MIME type of each SSE element in the annotated stream. + * + * @deprecated replaced by {@link RestStreamElementType} */ @Inherited @Target({ ElementType.TYPE, ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @Documented +@Deprecated public @interface RestSseElementType { String value(); } diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestStreamElementType.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestStreamElementType.java new file mode 100644 index 0000000000000..4a603863523a8 --- /dev/null +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/RestStreamElementType.java @@ -0,0 +1,19 @@ +package org.jboss.resteasy.reactive; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Defines the MIME type of each SSE element in the annotated stream. + */ +@Inherited +@Target({ ElementType.TYPE, ElementType.METHOD }) +@Retention(RetentionPolicy.RUNTIME) +@Documented +public @interface RestStreamElementType { + String value(); +} diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/model/ResourceMethod.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/model/ResourceMethod.java index 388cd674a01c1..a2ef1d08d7d44 100644 --- a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/model/ResourceMethod.java +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/model/ResourceMethod.java @@ -6,6 +6,7 @@ import javax.ws.rs.Consumes; import javax.ws.rs.Produces; import org.jboss.resteasy.reactive.RestSseElementType; +import org.jboss.resteasy.reactive.RestStreamElementType; /** * A representation of a REST endpoint. This is passed directly to recorders so must be bytecode serializable. @@ -30,10 +31,11 @@ public class ResourceMethod { private String[] produces; /** - * The value of the {@link RestSseElementType} annotation, if none is specified on the method + * The value of the {@link RestStreamElementType} or the {@link RestSseElementType} annotation, if none is specified on the + * method * then this represents the value inherited from the class level, or null if not specified. */ - private String sseElementType; + private String streamElementType; /** * The value of the {@link Consumes} annotation, if none is specified on the method @@ -68,14 +70,14 @@ public class ResourceMethod { public ResourceMethod() { } - public ResourceMethod(String httpMethod, String path, String[] produces, String sseElementType, String[] consumes, + public ResourceMethod(String httpMethod, String path, String[] produces, String streamElementType, String[] consumes, Set nameBindingNames, String name, String returnType, String simpleReturnType, MethodParameter[] parameters, boolean blocking, boolean suspended, boolean isSse, boolean isFormParamRequired, boolean isMultipart, List subResourceMethods) { this.httpMethod = httpMethod; this.path = path; this.produces = produces; - this.sseElementType = sseElementType; + this.streamElementType = streamElementType; this.consumes = consumes; this.nameBindingNames = nameBindingNames; this.name = name; @@ -220,13 +222,13 @@ public ResourceMethod setMultipart(boolean isMultipart) { return this; } - public ResourceMethod setSseElementType(String sseElementType) { - this.sseElementType = sseElementType; + public ResourceMethod setStreamElementType(String streamElementType) { + this.streamElementType = streamElementType; return this; } - public String getSseElementType() { - return sseElementType; + public String getStreamElementType() { + return streamElementType; } @Override diff --git a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/util/RestMediaType.java b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/util/RestMediaType.java index f2a034bb46558..57bb4fa6dd0b8 100644 --- a/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/util/RestMediaType.java +++ b/independent-projects/resteasy-reactive/common/runtime/src/main/java/org/jboss/resteasy/reactive/common/util/RestMediaType.java @@ -5,14 +5,14 @@ /** * Extended media types in Resteasy Reactive. */ -public final class RestMediaType { +public class RestMediaType extends MediaType { public static final String APPLICATION_NDJSON = "application/x-ndjson"; - public static final MediaType APPLICATION_NDJSON_TYPE = new MediaType("application", "x-ndjson"); + public static final RestMediaType APPLICATION_NDJSON_TYPE = new RestMediaType("application", "x-ndjson"); public static final String APPLICATION_STREAM_JSON = "application/stream+json"; - public static final MediaType APPLICATION_STREAM_JSON_TYPE = new MediaType("application", "stream+json"); - - private RestMediaType() { + public static final RestMediaType APPLICATION_STREAM_JSON_TYPE = new RestMediaType("application", "stream+json"); + public RestMediaType(String type, String subtype) { + super(type, subtype); } } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/SseUtil.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/SseUtil.java index f64b6e824b08b..963d71a82fec5 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/SseUtil.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/SseUtil.java @@ -117,7 +117,7 @@ private static String serialiseDataToString(ResteasyReactiveRequestContext conte Object entity = event.getData(); Class entityClass = event.getType(); Type entityType = event.getGenericType(); - MediaType mediaType = eventMediaType != null ? eventMediaType : context.getTarget().getSseElementType(); + MediaType mediaType = eventMediaType != null ? eventMediaType : context.getTarget().getStreamElementType(); if (mediaType == null) { mediaType = MediaType.TEXT_PLAIN_TYPE; } @@ -156,8 +156,8 @@ public static void setHeaders(ResteasyReactiveRequestContext context, ServerHttp if (!response.headWritten()) { response.setStatusCode(Response.Status.OK.getStatusCode()); response.setResponseHeader(HttpHeaders.CONTENT_TYPE, MediaType.SERVER_SENT_EVENTS); - if (context.getTarget().getSseElementType() != null) { - response.setResponseHeader(SSE_CONTENT_TYPE, context.getTarget().getSseElementType().toString()); + if (context.getTarget().getStreamElementType() != null) { + response.setResponseHeader(SSE_CONTENT_TYPE, context.getTarget().getStreamElementType().toString()); } response.setChunked(true); for (int i = 0; i < customizers.size(); i++) { diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java index ba8d127755999..23f9b0b2aa024 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java @@ -134,9 +134,9 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz, MultivaluedMap score = new QuarkusMultivaluedHashMap<>(); Map pathParameterIndexes = buildParamIndexMap(classPathTemplate, methodPathTemplate); - MediaType sseElementType = null; - if (method.getSseElementType() != null) { - sseElementType = MediaType.valueOf(method.getSseElementType()); + MediaType streamElementType = null; + if (method.getStreamElementType() != null) { + streamElementType = MediaType.valueOf(method.getStreamElementType()); } List consumesMediaTypes; if (method.getConsumes() == null) { @@ -467,7 +467,8 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz, clazz.getFactory(), handlers.toArray(EMPTY_REST_HANDLER_ARRAY), method.getName(), parameterDeclaredTypes, nonAsyncReturnType, method.isBlocking(), resourceClass, lazyMethod, - pathParameterIndexes, info.isDevelopmentMode() ? score : null, sseElementType, clazz.resourceExceptionMapper()); + pathParameterIndexes, info.isDevelopmentMode() ? score : null, streamElementType, + clazz.resourceExceptionMapper()); } private void addResponseHandler(ServerResourceMethod method, List handlers) { diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/mapping/RuntimeResource.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/mapping/RuntimeResource.java index df7ce16ece12c..adead56bfd355 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/mapping/RuntimeResource.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/mapping/RuntimeResource.java @@ -32,7 +32,7 @@ public class RuntimeResource { private final ResteasyReactiveResourceInfo lazyMethod; private final Map pathParameterIndexes; private final Map> score; - private final MediaType sseElementType; + private final MediaType streamElementType; private final Map, ResourceExceptionMapper> classExceptionMappers; public RuntimeResource(String httpMethod, URITemplate path, URITemplate classPath, ServerMediaType produces, @@ -42,7 +42,7 @@ public RuntimeResource(String httpMethod, URITemplate path, URITemplate classPat Class[] parameterTypes, Type returnType, boolean blocking, Class resourceClass, ResteasyReactiveResourceInfo lazyMethod, Map pathParameterIndexes, Map> score, - MediaType sseElementType, + MediaType streamElementType, Map, ResourceExceptionMapper> classExceptionMappers) { this.httpMethod = httpMethod; this.path = path; @@ -60,7 +60,7 @@ public RuntimeResource(String httpMethod, URITemplate path, URITemplate classPat this.lazyMethod = lazyMethod; this.pathParameterIndexes = pathParameterIndexes; this.score = score; - this.sseElementType = sseElementType; + this.streamElementType = streamElementType; this.classExceptionMappers = classExceptionMappers; } @@ -120,13 +120,13 @@ public SimpleResourceInfo getSimplifiedResourceInfo() { return new ResteasyReactiveSimplifiedResourceInfo(javaMethodName, resourceClass, parameterTypes); } - public MediaType getSseElementType() { - return sseElementType; + public MediaType getStreamElementType() { + return streamElementType; } /** * The @Path that is present on the class itself - * + * * @return */ public URITemplate getClassPath() { diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/model/ServerResourceMethod.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/model/ServerResourceMethod.java index 199c2282050ed..e1719217f365b 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/model/ServerResourceMethod.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/model/ServerResourceMethod.java @@ -21,12 +21,12 @@ public class ServerResourceMethod extends ResourceMethod { public ServerResourceMethod() { } - public ServerResourceMethod(String httpMethod, String path, String[] produces, String sseElementType, String[] consumes, + public ServerResourceMethod(String httpMethod, String path, String[] produces, String streamElementType, String[] consumes, Set nameBindingNames, String name, String returnType, String simpleReturnType, MethodParameter[] parameters, boolean blocking, boolean suspended, boolean sse, boolean formParamRequired, boolean multipart, List subResourceMethods, Supplier invoker, Set methodAnnotationNames, List handlerChainCustomizers, ParameterExtractor customerParameterExtractor) { - super(httpMethod, path, produces, sseElementType, consumes, nameBindingNames, name, returnType, simpleReturnType, + super(httpMethod, path, produces, streamElementType, consumes, nameBindingNames, name, returnType, simpleReturnType, parameters, blocking, suspended, sse, formParamRequired, multipart, subResourceMethods); this.invoker = invoker; this.methodAnnotationNames = methodAnnotationNames;