Skip to content

Commit

Permalink
Add JSON streaming for RESTEasy Reactive Jsonb and Jackson
Browse files Browse the repository at this point in the history
We now support json streaming. Example:

```
@path("stream-json/multi")
    @get
    @produces(SseMediaType.APPLICATION_STREAM_JSON)
    @RestSseElementType(MediaType.APPLICATION_JSON)
    public Multi<Message> multiStreamJson() {
        return Multi.createFrom().items(new Message("hello"), new Message("stef"));
    }
```

We support "application/x-ndjson" and "application/stream+json".

Moreover, I've added a new utility class called `SseMediaType` to be used by users. The implementation is similar to `MediaType`.

Questions:
- Added for both JSONB and Jackson extensions, is this correct?
- I've copied the existing tests we had for sse in JSONB to Jackson, is this correct?
  • Loading branch information
Sgitario committed Oct 20, 2021
1 parent d634dd9 commit 1e32c23
Show file tree
Hide file tree
Showing 10 changed files with 442 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@
<artifactId>quarkus-hibernate-validator-deployment</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jaxrs-client-reactive-deployment</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.jboss.logging.Logger;
import org.jboss.resteasy.reactive.common.model.ResourceMethod;
import org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames;
import org.jboss.resteasy.reactive.common.util.SseMediaType;
import org.jboss.resteasy.reactive.server.util.MethodId;

import com.fasterxml.jackson.annotation.JsonView;
Expand Down Expand Up @@ -137,15 +138,18 @@ void additionalProviders(List<JacksonFeatureBuildItem> jacksonFeatureBuildItems,
additionalWriters
.produce(new MessageBodyWriterBuildItem(getJacksonMessageBodyWriter(applicationNeedsSpecialJacksonFeatures),
Object.class.getName(),
Collections.singletonList(MediaType.APPLICATION_JSON)));
List.of(MediaType.APPLICATION_JSON, SseMediaType.APPLICATION_NDJSON,
SseMediaType.APPLICATION_STREAM_JSON)));
additionalWriters
.produce(new MessageBodyWriterBuildItem(VertxJsonArrayMessageBodyWriter.class.getName(),
JsonArray.class.getName(),
Collections.singletonList(MediaType.APPLICATION_JSON)));
List.of(MediaType.APPLICATION_JSON, SseMediaType.APPLICATION_NDJSON,
SseMediaType.APPLICATION_STREAM_JSON)));
additionalWriters
.produce(new MessageBodyWriterBuildItem(VertxJsonObjectMessageBodyWriter.class.getName(),
JsonObject.class.getName(),
Collections.singletonList(MediaType.APPLICATION_JSON)));
List.of(MediaType.APPLICATION_JSON, SseMediaType.APPLICATION_NDJSON,
SseMediaType.APPLICATION_STREAM_JSON)));
}

private String getJacksonMessageBodyWriter(boolean applicationNeedsSpecialJacksonFeatures) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.quarkus.resteasy.reactive.jackson.deployment.test.sse;

public class Message {
public String name;

public Message(String name) {
this.name = name;
}

// for Jsonb
public Message() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package io.quarkus.resteasy.reactive.jackson.deployment.test.sse;

import java.io.IOException;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.sse.SseEventSink;

import org.jboss.resteasy.reactive.RestSseElementType;
import org.jboss.resteasy.reactive.common.util.SseMediaType;

import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Multi;

@Path("sse")
public class SseResource {

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void sse(Sse sse, SseEventSink sink) {
if (sink == null) {
throw new IllegalStateException("No client connected.");
}
SseBroadcaster sseBroadcaster = sse.newBroadcaster();

sseBroadcaster.register(sink);
sseBroadcaster.broadcast(sse.newEventBuilder().data("hello").build())
.thenCompose(v -> sseBroadcaster.broadcast(sse.newEventBuilder().data("stef").build()))
.thenAccept(v -> sseBroadcaster.close());
}

@Path("multi")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> multiText() {
return Multi.createFrom().items("hello", "stef");
}

@Path("json")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestSseElementType(MediaType.APPLICATION_JSON)
public void sseJson(Sse sse, SseEventSink sink) throws IOException {
if (sink == null) {
throw new IllegalStateException("No client connected.");
}
SseBroadcaster sseBroadcaster = sse.newBroadcaster();

sseBroadcaster.register(sink);
sseBroadcaster.broadcast(sse.newEventBuilder().data(new Message("hello")).build())
.thenCompose(v -> sseBroadcaster.broadcast(sse.newEventBuilder().data(new Message("stef")).build()))
.thenAccept(v -> sseBroadcaster.close());
}

@Blocking
@Path("blocking/json")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestSseElementType(MediaType.APPLICATION_JSON)
public void blockingSseJson(Sse sse, SseEventSink sink) throws IOException {
if (sink == null) {
throw new IllegalStateException("No client connected.");
}
SseBroadcaster sseBroadcaster = sse.newBroadcaster();

sseBroadcaster.register(sink);
sseBroadcaster.broadcast(sse.newEventBuilder().data(new Message("hello")).build())
.thenCompose(v -> sseBroadcaster.broadcast(sse.newEventBuilder().data(new Message("stef")).build()))
.thenAccept(v -> sseBroadcaster.close());
}

@Path("json2")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public void sseJson2(Sse sse, SseEventSink sink) throws IOException {
if (sink == null) {
throw new IllegalStateException("No client connected.");
}
SseBroadcaster sseBroadcaster = sse.newBroadcaster();

// Same as sseJson but set mediaType in builder
sseBroadcaster.register(sink);
sseBroadcaster
.broadcast(sse.newEventBuilder().data(new Message("hello")).mediaType(MediaType.APPLICATION_JSON_TYPE).build())
.thenCompose(v -> sseBroadcaster.broadcast(
sse.newEventBuilder().mediaType(MediaType.APPLICATION_JSON_TYPE).data(new Message("stef")).build()))
.thenAccept(v -> sseBroadcaster.close());
}

@Path("json/multi")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestSseElementType(MediaType.APPLICATION_JSON)
public Multi<Message> multiJson() {
return Multi.createFrom().items(new Message("hello"), new Message("stef"));
}

@Path("json/multi2")
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<Message> multiDefaultElementType() {
return Multi.createFrom().items(new Message("hello"), new Message("stef"));
}

@Path("ndjson/multi")
@GET
@Produces(SseMediaType.APPLICATION_NDJSON)
@RestSseElementType(MediaType.APPLICATION_JSON)
public Multi<Message> multiNdJson() {
return Multi.createFrom().items(new Message("hello"), new Message("stef"));
}

@Path("stream-json/multi")
@GET
@Produces(SseMediaType.APPLICATION_STREAM_JSON)
@RestSseElementType(MediaType.APPLICATION_JSON)
public Multi<Message> multiStreamJson() {
return Multi.createFrom().items(new Message("hello"), new Message("stef"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package io.quarkus.resteasy.reactive.jackson.deployment.test.sse;

import static io.restassured.RestAssured.when;
import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;

import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.sse.InboundSseEvent;
import javax.ws.rs.sse.SseEventSource;

import org.apache.http.HttpStatus;
import org.jboss.resteasy.reactive.client.impl.MultiInvoker;
import org.jboss.resteasy.reactive.common.util.SseMediaType;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.smallrye.mutiny.Multi;

public class SseTestCase {

@TestHTTPResource
URI uri;

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(SseResource.class, Message.class));

@Test
public void testSseFromSse() throws Exception {
testSse("sse");
}

@Test
public void testSseFromMulti() throws Exception {
testSse("sse/multi");
}

private void testSse(String path) throws Exception {
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(uri.toString() + path);
// do not reconnect
try (SseEventSource eventSource = SseEventSource.target(target).reconnectingEvery(Integer.MAX_VALUE, TimeUnit.SECONDS)
.build()) {
CompletableFuture<List<String>> res = new CompletableFuture<>();
List<String> collect = Collections.synchronizedList(new ArrayList<>());
eventSource.register(new Consumer<InboundSseEvent>() {
@Override
public void accept(InboundSseEvent inboundSseEvent) {
collect.add(inboundSseEvent.readData());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
res.completeExceptionally(throwable);
}
}, () -> {
res.complete(collect);
});
eventSource.open();
assertThat(res.get(5, TimeUnit.SECONDS)).containsExactly("hello", "stef");
}
}

@Test
public void testMultiFromSse() {
testMulti("sse");
}

@Test
public void testMultiFromMulti() {
testMulti("sse/multi");
}

private void testMulti(String path) {
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(uri.toString() + path);
Multi<String> multi = target.request().rx(MultiInvoker.class).get(String.class);
List<String> list = multi.collect().asList().await().atMost(Duration.ofSeconds(30));
assertThat(list).containsExactly("hello", "stef");
}

@Test
public void testJsonMultiFromSse() {
testJsonMulti("sse/json");
testJsonMulti("sse/json2");
testJsonMulti("sse/blocking/json");
}

@Test
public void testJsonMultiFromMulti() {
testJsonMulti("sse/json/multi");
}

@Test
public void testJsonMultiFromMultiWithDefaultElementType() {
testJsonMulti("sse/json/multi2");
}

@Test
public void testNdJsonMultiFromMulti() {
when().get(uri.toString() + "sse/ndjson/multi")
.then().statusCode(HttpStatus.SC_OK)
// @formatter:off
.body(is("{\"name\":\"hello\"}/n"
+ "{\"name\":\"stef\"}/n"))
// @formatter:on
.header(HttpHeaders.CONTENT_TYPE, containsString(SseMediaType.APPLICATION_NDJSON));
}

@Test
public void testStreamJsonMultiFromMulti() {
when().get(uri.toString() + "sse/stream-json/multi")
.then().statusCode(HttpStatus.SC_OK)
// @formatter:off
.body(is("{\"name\":\"hello\"}/n"
+ "{\"name\":\"stef\"}/n"))
// @formatter:on
.header(HttpHeaders.CONTENT_TYPE, containsString(SseMediaType.APPLICATION_STREAM_JSON));
}

private void testJsonMulti(String path) {
Client client = ClientBuilder.newBuilder().build();
WebTarget target = client.target(uri.toString() + path);
Multi<Message> multi = target.request().rx(MultiInvoker.class).get(Message.class);
List<Message> list = multi.collect().asList().await().atMost(Duration.ofSeconds(30));
assertThat(list).extracting("name").containsExactly("hello", "stef");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import javax.ws.rs.core.MediaType;

import org.jboss.resteasy.reactive.common.util.SseMediaType;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.Feature;
import io.quarkus.deployment.annotations.BuildProducer;
Expand Down Expand Up @@ -52,7 +54,7 @@ void additionalProviders(BuildProducer<AdditionalBeanBuildItem> additionalBean,
additionalReaders.produce(new MessageBodyReaderBuildItem(JsonbMessageBodyReader.class.getName(), Object.class.getName(),
Collections.singletonList(MediaType.APPLICATION_JSON)));
additionalWriters.produce(new MessageBodyWriterBuildItem(JsonbMessageBodyWriter.class.getName(), Object.class.getName(),
Collections.singletonList(MediaType.APPLICATION_JSON)));
List.of(MediaType.APPLICATION_JSON, SseMediaType.APPLICATION_NDJSON, SseMediaType.APPLICATION_STREAM_JSON)));
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import javax.ws.rs.sse.SseEventSink;

import org.jboss.resteasy.reactive.RestSseElementType;
import org.jboss.resteasy.reactive.common.util.SseMediaType;

import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Multi;
Expand Down Expand Up @@ -105,4 +106,20 @@ public Multi<Message> multiDefaultElementType() {
return Multi.createFrom().items(new Message("hello"), new Message("stef"));
}

@Path("ndjson/multi")
@GET
@Produces(SseMediaType.APPLICATION_NDJSON)
@RestSseElementType(MediaType.APPLICATION_JSON)
public Multi<Message> multiNdJson() {
return Multi.createFrom().items(new Message("hello"), new Message("stef"));
}

@Path("stream-json/multi")
@GET
@Produces(SseMediaType.APPLICATION_STREAM_JSON)
@RestSseElementType(MediaType.APPLICATION_JSON)
public Multi<Message> multiStreamJson() {
return Multi.createFrom().items(new Message("hello"), new Message("stef"));
}

}
Loading

0 comments on commit 1e32c23

Please sign in to comment.