Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Provides a new implementation for SSE in the webserver. This implemen…
Browse files Browse the repository at this point in the history
…tation does not use the normal output stream to serialize the events to avoid problems with buffering and chunked encoding. Instead, it writes directly to the underlying socket writer and flushes data as needed. As a result, chunked encoding is no longer used for SSE. Several tests have been updated.
spericas committed Aug 29, 2024
1 parent 7f7da14 commit 197f68d
Showing 14 changed files with 427 additions and 91 deletions.
Original file line number Diff line number Diff line change
@@ -605,6 +605,15 @@ public SocketHttpClient sendChunk(String payload) throws IOException {
return this;
}

/**
* Provides access to underlying socket reader.
*
* @return the reader
*/
public BufferedReader socketReader() {
return socketReader;
}

/**
* Override this to send a specific payload.
*
Original file line number Diff line number Diff line change
@@ -411,23 +411,31 @@ public int read() {
if (finished) {
return -1;
}
ensureBuffer(512);
if (finished || currentBuffer == null) {
try {
ensureBuffer(512);
if (finished || currentBuffer == null) {
return -1;
}
return currentBuffer.read();
} catch (DataReader.InsufficientDataAvailableException e) {
return -1;
}
return currentBuffer.read();
}

@Override
public int read(byte[] b, int off, int len) {
if (finished) {
return -1;
}
ensureBuffer(len);
if (finished || currentBuffer == null) {
try {
ensureBuffer(len);
if (finished || currentBuffer == null) {
return -1;
}
return currentBuffer.read(b, off, len);
} catch (DataReader.InsufficientDataAvailableException e) {
return -1;
}
return currentBuffer.read(b, off, len);
}

private void ensureBuffer(int estimate) {
Original file line number Diff line number Diff line change
@@ -92,7 +92,6 @@ public <X extends Source<SseEvent>> void handle(X source, HttpClientResponse res
emit = false;
}
}

source.onClose();
} catch (IOException e) {
source.onError(e);
165 changes: 116 additions & 49 deletions webserver/sse/src/main/java/io/helidon/webserver/sse/SseSink.java
Original file line number Diff line number Diff line change
@@ -16,23 +16,33 @@

package io.helidon.webserver.sse;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.function.BiConsumer;

import io.helidon.common.GenericType;
import io.helidon.common.buffers.BufferData;
import io.helidon.common.media.type.MediaType;
import io.helidon.common.media.type.MediaTypes;
import io.helidon.http.DateTime;
import io.helidon.http.Header;
import io.helidon.http.HeaderNames;
import io.helidon.http.HttpMediaType;
import io.helidon.http.ServerResponseHeaders;
import io.helidon.http.Status;
import io.helidon.http.WritableHeaders;
import io.helidon.http.media.EntityWriter;
import io.helidon.http.media.MediaContext;
import io.helidon.http.sse.SseEvent;
import io.helidon.webserver.ConnectionContext;
import io.helidon.webserver.http.ServerResponse;
import io.helidon.webserver.http.spi.Sink;
import io.helidon.webserver.http.spi.SinkProviderContext;

import static io.helidon.http.HeaderValues.CONTENT_TYPE_EVENT_STREAM;
import static io.helidon.http.HeaderValues.create;

/**
* Implementation of an SSE sink. Emits {@link SseEvent}s.
@@ -44,71 +54,128 @@ public class SseSink implements Sink<SseEvent> {
*/
public static final GenericType<SseSink> TYPE = GenericType.create(SseSink.class);

private static final Header CACHE_NO_CACHE_ONLY = create(HeaderNames.CACHE_CONTROL, "no-cache");
private static final byte[] SSE_NL = "\n".getBytes(StandardCharsets.UTF_8);
private static final byte[] SSE_ID = "id:".getBytes(StandardCharsets.UTF_8);
private static final byte[] SSE_DATA = "data:".getBytes(StandardCharsets.UTF_8);
private static final byte[] SSE_EVENT = "event:".getBytes(StandardCharsets.UTF_8);
private static final byte[] SSE_COMMENT = ":".getBytes(StandardCharsets.UTF_8);
private static final byte[] OK_200 = "HTTP/1.1 200 OK\r\n".getBytes(StandardCharsets.UTF_8);
private static final byte[] DATE = "Date: ".getBytes(StandardCharsets.UTF_8);

private final BiConsumer<Object, MediaType> eventConsumer;
private final Runnable closeRunnable;
private final OutputStream outputStream;

SseSink(ServerResponse serverResponse, BiConsumer<Object, MediaType> eventConsumer, Runnable closeRunnable) {
// Verify response has no status or content type
HttpMediaType ct = serverResponse.headers().contentType().orElse(null);
if (serverResponse.status().code() != Status.OK_200.code()
|| ct != null && !CONTENT_TYPE_EVENT_STREAM.values().equals(ct.mediaType().text())) {
throw new IllegalStateException("ServerResponse instance cannot be used to create SseResponse");
}
private static final WritableHeaders<?> EMPTY_HEADERS = WritableHeaders.create();

// Ensure content type set for SSE
if (ct == null) {
serverResponse.headers().add(CONTENT_TYPE_EVENT_STREAM);
}
private final ServerResponse response;
private final ConnectionContext ctx;
private final MediaContext mediaContext;
private final Runnable closeRunnable;

this.outputStream = serverResponse.outputStream();
this.eventConsumer = eventConsumer;
this.closeRunnable = closeRunnable;
SseSink(SinkProviderContext context) {
this.response = context.serverResponse();
this.ctx = context.connectionContext();
this.mediaContext = ctx.listenerContext().mediaContext();
this.closeRunnable = context.closeRunnable();
initResponse();
}

@Override
public SseSink emit(SseEvent sseEvent) {
try {
Optional<String> comment = sseEvent.comment();
if (comment.isPresent()) {
outputStream.write(SSE_COMMENT);
outputStream.write(comment.get().getBytes(StandardCharsets.UTF_8));
outputStream.write(SSE_NL);
}
Optional<String> id = sseEvent.id();
if (id.isPresent()) {
outputStream.write(SSE_ID);
outputStream.write(id.get().getBytes(StandardCharsets.UTF_8));
outputStream.write(SSE_NL);
}
Optional<String> name = sseEvent.name();
if (name.isPresent()) {
outputStream.write(SSE_EVENT);
outputStream.write(name.get().getBytes(StandardCharsets.UTF_8));
outputStream.write(SSE_NL);
}
Object data = sseEvent.data();
if (data != SseEvent.NO_DATA) {
outputStream.write(SSE_DATA);
eventConsumer.accept(data, sseEvent.mediaType().orElse(MediaTypes.TEXT_PLAIN));
outputStream.write(SSE_NL);
}
outputStream.write(SSE_NL);
outputStream.flush();
} catch (IOException e) {
throw new UncheckedIOException(e);
BufferData bufferData = BufferData.growing(512);

Optional<String> comment = sseEvent.comment();
if (comment.isPresent()) {
bufferData.write(SSE_COMMENT);
bufferData.write(comment.get().getBytes(StandardCharsets.UTF_8));
bufferData.write(SSE_NL);
}
Optional<String> id = sseEvent.id();
if (id.isPresent()) {
bufferData.write(SSE_ID);
bufferData.write(id.get().getBytes(StandardCharsets.UTF_8));
bufferData.write(SSE_NL);
}
Optional<String> name = sseEvent.name();
if (name.isPresent()) {
bufferData.write(SSE_EVENT);
bufferData.write(name.get().getBytes(StandardCharsets.UTF_8));
bufferData.write(SSE_NL);
}
Object data = sseEvent.data();
if (data != null) {
bufferData.write(SSE_DATA);
byte[] bytes = serializeData(data, sseEvent.mediaType().orElse(MediaTypes.TEXT_PLAIN));
bufferData.write(bytes);
bufferData.write(SSE_NL);
}
bufferData.write(SSE_NL);

// write event to the network
ctx.dataWriter().writeNow(bufferData);
return this;
}

@Override
public void close() {
closeRunnable.run();
ctx.serverSocket().close();
}

void initResponse() {
ServerResponseHeaders headers = response.headers();

// verify response has no status or content type
HttpMediaType ct = headers.contentType().orElse(null);
if (response.status().code() != Status.OK_200.code()
|| ct != null && !CONTENT_TYPE_EVENT_STREAM.values().equals(ct.mediaType().text())) {
throw new IllegalStateException("ServerResponse instance cannot be used to create SseResponse");
}
if (ct == null) {
headers.add(CONTENT_TYPE_EVENT_STREAM);
}
headers.add(CACHE_NO_CACHE_ONLY);

// start writing heading to buffer
BufferData buffer = BufferData.growing(256);
buffer.write(OK_200);

// serialize headers
if (!headers.contains(HeaderNames.DATE)) {
buffer.write(DATE);
byte[] dateBytes = DateTime.http1Bytes();
buffer.write(dateBytes);
}
for (Header header : headers) {
header.writeHttp1Header(buffer);
}

// complete heading
buffer.write('\r'); // "\r\n" - empty line after headers
buffer.write('\n');

// write response heading to the network
ctx.dataWriter().writeNow(buffer);
}

private byte[] serializeData(Object object, MediaType mediaType) {
if (object instanceof byte[] bytes) {
return bytes;
} else if (mediaContext != null) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
if (object instanceof String str && mediaType.equals(MediaTypes.TEXT_PLAIN)) {
EntityWriter<String> writer = mediaContext.writer(GenericType.STRING, EMPTY_HEADERS, EMPTY_HEADERS);
writer.write(GenericType.STRING, str, baos, EMPTY_HEADERS, EMPTY_HEADERS);
} else {
GenericType<Object> type = GenericType.create(object);
WritableHeaders<?> resHeaders = WritableHeaders.create();
resHeaders.set(HeaderNames.CONTENT_TYPE, mediaType.text());
EntityWriter<Object> writer = mediaContext.writer(type, EMPTY_HEADERS, resHeaders);
writer.write(type, object, baos, EMPTY_HEADERS, resHeaders);
}
return baos.toByteArray();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
throw new IllegalStateException("Unable to serialize SSE event without a media context");
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
* Copyright (c) 2023, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
import io.helidon.webserver.http.ServerResponse;
import io.helidon.webserver.http.spi.Sink;
import io.helidon.webserver.http.spi.SinkProvider;
import io.helidon.webserver.http.spi.SinkProviderContext;

/**
* Sink provider for SSE type.
@@ -37,10 +38,18 @@ public boolean supports(GenericType<? extends Sink<?>> type, ServerRequest reque
return SseSink.TYPE.equals(type) && request.headers().isAccepted(MediaTypes.TEXT_EVENT_STREAM);
}


@Override
@SuppressWarnings("unchecked")
public <X extends Sink<SseEvent>> X create(ServerResponse response, BiConsumer<Object, MediaType> eventConsumer,
Runnable closeRunnable) {
return (X) new SseSink(response, eventConsumer, closeRunnable);
public <X extends Sink<SseEvent>> X create(SinkProviderContext context) {
return (X) new SseSink(context);
}

@Override
public <X extends Sink<SseEvent>> X create(ServerResponse response,
BiConsumer<Object, MediaType> eventConsumer,
Runnable closeRunnable) {
throw new UnsupportedOperationException("Deprecated, use other create method in class");
}

}
3 changes: 2 additions & 1 deletion webserver/sse/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
* Copyright (c) 2023, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@

requires static io.helidon.common.features.api;

requires io.helidon.common.socket;
requires transitive io.helidon.common;
requires transitive io.helidon.http.sse;
requires transitive io.helidon.webserver;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
* Copyright (c) 2023, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -138,4 +138,9 @@ public DirectHandlers directHandlers() {
public ListenerConfig config() {
return listenerConfiguration;
}

@Override
public HelidonSocket serverSocket() {
return serverSocket;
}
}
Loading

0 comments on commit 197f68d

Please sign in to comment.