Skip to content

Commit

Permalink
ndJson support added
Browse files Browse the repository at this point in the history
Signed-off-by: David Kral <david.k.kral@oracle.com>
  • Loading branch information
Verdent committed Nov 18, 2020
1 parent 864ac1d commit 2f84730
Show file tree
Hide file tree
Showing 7 changed files with 409 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ public final class MediaType implements AcceptPredicate<MediaType> {
*/
public static final MediaType TEXT_EVENT_STREAM;

/**
* A {@link MediaType} constant representing {@code application/x-ndjson} media type.
*/
public static final MediaType APPLICATION_X_NDJSON;

static {
Map<String, MediaType> knownTypes = new HashMap<>();

Expand Down Expand Up @@ -213,6 +218,9 @@ public final class MediaType implements AcceptPredicate<MediaType> {
TEXT_EVENT_STREAM = new MediaType("text", "event-stream");
knownTypes.put("text/event-stream", TEXT_EVENT_STREAM);

APPLICATION_X_NDJSON = new MediaType("application", "x-ndjson");
knownTypes.put("application/x-ndjson", APPLICATION_X_NDJSON);

KNOWN_TYPES = Collections.unmodifiableMap(knownTypes);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.media.jackson;

import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;

import io.helidon.common.GenericType;
import io.helidon.common.http.DataChunk;
import io.helidon.common.http.MediaType;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
import io.helidon.media.common.MessageBodyStreamWriter;
import io.helidon.media.common.MessageBodyWriterContext;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
* Message body stream writer supporting object binding with Jackson.
* This writer is for {@link MediaType#APPLICATION_X_NDJSON} media type.
*/
class JacksonNdBodyStreamWriter implements MessageBodyStreamWriter<Object> {

private static final byte[] NL = "\n".getBytes(StandardCharsets.UTF_8);

private final ObjectMapper objectMapper;

private JacksonNdBodyStreamWriter(ObjectMapper objectMapper) {
this.objectMapper = Objects.requireNonNull(objectMapper);
}

static JacksonNdBodyStreamWriter create(ObjectMapper objectMapper) {
return new JacksonNdBodyStreamWriter(objectMapper);
}

@Override
public PredicateResult accept(GenericType<?> type, MessageBodyWriterContext context) {
if (CharSequence.class.isAssignableFrom(type.rawType())) {
return PredicateResult.NOT_SUPPORTED;
}
return context.contentType()
.or(() -> findMediaType(context))
.filter(mediaType -> mediaType.equals(MediaType.APPLICATION_X_NDJSON))
.map(it -> PredicateResult.COMPATIBLE)
.orElse(PredicateResult.NOT_SUPPORTED);
}

@Override
public Multi<DataChunk> write(Flow.Publisher<?> publisher, GenericType<?> type, MessageBodyWriterContext context) {
MediaType contentType = MediaType.APPLICATION_X_NDJSON;
context.contentType(contentType);
JacksonBodyWriter.ObjectToChunks objectToChunks = new JacksonBodyWriter.ObjectToChunks(objectMapper, context.charset());
AtomicBoolean first = new AtomicBoolean(true);
return Multi.create(publisher)
.flatMap(objectToChunks)
.flatMap(dataChunk -> {
if (first.getAndSet(false)) {
return Single.just(dataChunk);
} else {
return Multi.just(DataChunk.create(NL),
dataChunk);
}
});
}

private Optional<MediaType> findMediaType(MessageBodyWriterContext context) {
try {
return Optional.of(context.findAccepted(MediaType.APPLICATION_X_NDJSON));
} catch (IllegalStateException ignore) {
//Not supported. Ignore exception.
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ public final class JacksonSupport implements MediaSupport {
private final JacksonBodyWriter writer;
private final JacksonBodyStreamWriter streamWriter;
private final JacksonEsBodyStreamWriter esStreamWriter;
private final JacksonNdBodyStreamWriter ndStreamWriter;

private JacksonSupport(final ObjectMapper objectMapper) {
this.reader = JacksonBodyReader.create(objectMapper);
this.writer = JacksonBodyWriter.create(objectMapper);
this.streamWriter = JacksonBodyStreamWriter.create(objectMapper);
this.esStreamWriter = JacksonEsBodyStreamWriter.create(objectMapper);
this.ndStreamWriter = JacksonNdBodyStreamWriter.create(objectMapper);
}

/**
Expand Down Expand Up @@ -137,7 +139,7 @@ public static MessageBodyStreamWriter<Object> streamWriter(ObjectMapper objectMa

/**
* Return a default Jackson entity event stream writer.
* This writer is for {@code text/event-stream} content type.
* This writer is for {@link io.helidon.common.http.MediaType#TEXT_EVENT_STREAM} content type.
*
* @return new Jackson body stream writer instance
*/
Expand All @@ -147,7 +149,7 @@ public static MessageBodyStreamWriter<Object> eventStreamWriter() {

/**
* Create a new Jackson entity stream writer based on {@link ObjectMapper} instance.
* This writer is for {@code text/event-stream} content type.
* This writer is for {@link io.helidon.common.http.MediaType#TEXT_EVENT_STREAM} content type.
*
* @param objectMapper object mapper instance
* @return new Jackson body stream writer instance
Expand All @@ -157,6 +159,28 @@ public static MessageBodyStreamWriter<Object> eventStreamWriter(ObjectMapper obj
return JacksonEsBodyStreamWriter.create(objectMapper);
}

/**
* Return a default Jackson entity event stream writer.
* This writer is for {@link io.helidon.common.http.MediaType#APPLICATION_X_NDJSON} content type.
*
* @return new Jackson body stream writer instance
*/
public static MessageBodyStreamWriter<Object> ndJsonStreamWriter() {
return DEFAULT.get().ndStreamWriter;
}

/**
* Create a new Jackson entity stream writer based on {@link ObjectMapper} instance.
* This writer is for {@link io.helidon.common.http.MediaType#APPLICATION_X_NDJSON} content type.
*
* @param objectMapper object mapper instance
* @return new Jackson body stream writer instance
*/
public static MessageBodyStreamWriter<Object> ndJsonStreamWriter(ObjectMapper objectMapper) {
Objects.requireNonNull(objectMapper);
return JacksonNdBodyStreamWriter.create(objectMapper);
}

/**
* Return Jackson reader instance.
*
Expand Down Expand Up @@ -185,14 +209,23 @@ public MessageBodyStreamWriter<Object> streamWriterInstance() {
}

/**
* Return Jackson stream writer instance for {@code text/event-stream} content type.
* Return Jackson stream writer instance for {@link io.helidon.common.http.MediaType#TEXT_EVENT_STREAM} content type.
*
* @return Jackson event stream writer instance
*/
public MessageBodyStreamWriter<Object> eventStreamWriterInstance() {
return esStreamWriter;
}

/**
* Return Jackson stream writer instance for {@link io.helidon.common.http.MediaType#APPLICATION_X_NDJSON} content type.
*
* @return Jackson event stream writer instance
*/
public MessageBodyStreamWriter<Object> ndJsonStreamWriterInstance() {
return ndStreamWriter;
}

@Override
public Collection<MessageBodyReader<?>> readers() {
return List.of(reader);
Expand All @@ -205,6 +238,6 @@ public Collection<MessageBodyWriter<?>> writers() {

@Override
public Collection<MessageBodyStreamWriter<?>> streamWriters() {
return List.of(streamWriter, esStreamWriter);
return List.of(streamWriter, ndStreamWriter, esStreamWriter);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.media.jsonb;

import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.json.bind.Jsonb;

import io.helidon.common.GenericType;
import io.helidon.common.http.DataChunk;
import io.helidon.common.http.MediaType;
import io.helidon.common.reactive.Multi;
import io.helidon.common.reactive.Single;
import io.helidon.media.common.MessageBodyStreamWriter;
import io.helidon.media.common.MessageBodyWriterContext;

/**
* Message body stream writer supporting object binding with JSON-B.
* This writer is for {@link MediaType#APPLICATION_X_NDJSON} media type.
*/
class JsonbNdBodyStreamWriter implements MessageBodyStreamWriter<Object> {

private static final byte[] NL = "\n".getBytes(StandardCharsets.UTF_8);

private final Jsonb jsonb;

private JsonbNdBodyStreamWriter(Jsonb jsonb) {
this.jsonb = Objects.requireNonNull(jsonb);
}

static JsonbNdBodyStreamWriter create(Jsonb jsonb) {
return new JsonbNdBodyStreamWriter(jsonb);
}

@Override
public PredicateResult accept(GenericType<?> type, MessageBodyWriterContext context) {
if (CharSequence.class.isAssignableFrom(type.rawType())) {
return PredicateResult.NOT_SUPPORTED;
}
return context.contentType()
.or(() -> findMediaType(context))
.filter(mediaType -> mediaType.equals(MediaType.APPLICATION_X_NDJSON))
.map(it -> PredicateResult.COMPATIBLE)
.orElse(PredicateResult.NOT_SUPPORTED);
}

@Override
public Multi<DataChunk> write(Flow.Publisher<?> publisher, GenericType<?> type, MessageBodyWriterContext context) {
MediaType contentType = MediaType.APPLICATION_X_NDJSON;
context.contentType(contentType);

AtomicBoolean first = new AtomicBoolean(true);

return Multi.create(publisher)
.map(object -> DataChunk.create(jsonb.toJson(object).getBytes(StandardCharsets.UTF_8)))
.flatMap(dataChunk -> {
if (first.getAndSet(false)) {
return Single.just(dataChunk);
} else {
return Multi.just(DataChunk.create(NL),
dataChunk);
}
});
}

private Optional<MediaType> findMediaType(MessageBodyWriterContext context) {
try {
return Optional.of(context.findAccepted(MediaType.APPLICATION_X_NDJSON));
} catch (IllegalStateException ignore) {
//Not supported. Ignore exception.
return Optional.empty();
}
}
}
41 changes: 37 additions & 4 deletions media/jsonb/src/main/java/io/helidon/media/jsonb/JsonbSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ public final class JsonbSupport implements MediaSupport {
private final JsonbBodyWriter writer;
private final JsonbBodyStreamWriter streamWriter;
private final JsonbEsBodyStreamWriter esStreamWriter;
private final JsonbNdBodyStreamWriter ndStreamWriter;

private JsonbSupport(Jsonb jsonb) {
this.reader = JsonbBodyReader.create(jsonb);
this.writer = JsonbBodyWriter.create(jsonb);
this.streamWriter = JsonbBodyStreamWriter.create(jsonb);
this.esStreamWriter = JsonbEsBodyStreamWriter.create(jsonb);
this.ndStreamWriter = JsonbNdBodyStreamWriter.create(jsonb);
}

/**
Expand Down Expand Up @@ -138,7 +140,7 @@ public static MessageBodyStreamWriter<Object> streamWriter(Jsonb jsonb) {

/**
* Return a default JSON-B entity event stream writer.
* This writer is for {@code text/event-stream} content type.
* This writer is for {@link io.helidon.common.http.MediaType#TEXT_EVENT_STREAM} content type.
*
* @return new JSON-B body stream writer instance
*/
Expand All @@ -148,7 +150,7 @@ public static MessageBodyStreamWriter<Object> eventStreamWriter() {

/**
* Create a new JSON-B entity stream writer based on {@link Jsonb} instance.
* This writer is for {@code text/event-stream} content type.
* This writer is for {@link io.helidon.common.http.MediaType#TEXT_EVENT_STREAM} content type.
*
* @param jsonb jsonb instance
* @return new JSON-B body stream writer instance
Expand All @@ -158,6 +160,28 @@ public static MessageBodyStreamWriter<Object> eventStreamWriter(Jsonb jsonb) {
return JsonbEsBodyStreamWriter.create(jsonb);
}

/**
* Return a default JSON-B entity event stream writer.
* This writer is for {@link io.helidon.common.http.MediaType#APPLICATION_X_NDJSON} content type.
*
* @return new JSON-B body stream writer instance
*/
public static MessageBodyStreamWriter<Object> ndJsonStreamWriter() {
return DEFAULT.get().ndStreamWriter;
}

/**
* Create a new JSON-B entity stream writer based on {@link Jsonb} instance.
* This writer is for {@link io.helidon.common.http.MediaType#APPLICATION_X_NDJSON} content type.
*
* @param jsonb jsonb instance
* @return new JSON-B body stream writer instance
*/
public static MessageBodyStreamWriter<Object> ndJsonStreamWriter(Jsonb jsonb) {
Objects.requireNonNull(jsonb);
return JsonbNdBodyStreamWriter.create(jsonb);
}

/**
* Return JSON-B reader instance.
*
Expand Down Expand Up @@ -186,14 +210,23 @@ public MessageBodyStreamWriter<Object> streamWriterInstance() {
}

/**
* Return JSON-B stream writer instance for {@code text/event-stream} content type.
* Return JSON-B stream writer instance for {@link io.helidon.common.http.MediaType#TEXT_EVENT_STREAM} content type.
*
* @return JSON-B event stream writer instance
*/
public MessageBodyStreamWriter<Object> eventStreamWriterInstance() {
return esStreamWriter;
}

/**
* Return JSON-B stream writer instance for {@link io.helidon.common.http.MediaType#APPLICATION_X_NDJSON} content type.
*
* @return JSON-B event stream writer instance
*/
public MessageBodyStreamWriter<Object> ndJsonStreamWriterInstance() {
return ndStreamWriter;
}


@Override
public Collection<MessageBodyReader<?>> readers() {
Expand All @@ -207,7 +240,7 @@ public Collection<MessageBodyWriter<?>> writers() {

@Override
public Collection<MessageBodyStreamWriter<?>> streamWriters() {
return List.of(streamWriter, esStreamWriter);
return List.of(streamWriter, ndStreamWriter, esStreamWriter);
}

}
Loading

1 comment on commit 2f84730

@dalexandrov
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks awesome :)

Please sign in to comment.