Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.1.2: Smart async writer in webserver #9292

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.common.socket;

import java.util.concurrent.ExecutorService;

import io.helidon.common.buffers.BufferData;

/**
* A special socket write that starts async but may switch to sync mode if it
* detects that the async queue size is below {@link #QUEUE_SIZE_THRESHOLD}.
* If it switches to sync mode, it shall never return back to async mode.
*/
public class SmartSocketWriter extends SocketWriter {
private static final long WINDOW_SIZE = 1000;
private static final double QUEUE_SIZE_THRESHOLD = 2.0;

private final SocketWriterAsync asyncWriter;
private volatile long windowIndex;
private volatile boolean asyncMode;

SmartSocketWriter(ExecutorService executor, HelidonSocket socket, int writeQueueLength) {
super(socket);
this.asyncWriter = new SocketWriterAsync(executor, socket, writeQueueLength);
this.asyncMode = true;
this.windowIndex = 0L;
}

@Override
public void write(BufferData... buffers) {
for (BufferData buffer : buffers) {
write(buffer);
}
}

@Override
public void write(BufferData buffer) {
if (asyncMode) {
asyncWriter.write(buffer);
if (++windowIndex % WINDOW_SIZE == 0 && asyncWriter.avgQueueSize() < QUEUE_SIZE_THRESHOLD) {
asyncMode = false;
}
} else {
asyncWriter.drainQueue();
writeNow(buffer); // blocking write
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,15 +44,19 @@ protected SocketWriter(HelidonSocket socket) {
* @param socket socket to write to
* @param writeQueueLength maximal number of queued writes, write operation will block if the queue is full; if set to
* {code 1} or lower, write queue is disabled and writes are direct to socket (blocking)
* @param smartAsyncWrites flag to enable smart async writes, see {@link io.helidon.common.socket.SmartSocketWriter}
* @return a new socket writer
*/
public static SocketWriter create(ExecutorService executor,
HelidonSocket socket,
int writeQueueLength) {
int writeQueueLength,
boolean smartAsyncWrites) {
if (writeQueueLength <= 1) {
return new SocketWriterDirect(socket);
} else {
return new SocketWriterAsync(executor, socket, writeQueueLength);
return smartAsyncWrites
? new SmartSocketWriter(executor, socket, writeQueueLength)
: new SocketWriterAsync(executor, socket, writeQueueLength);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,13 +33,15 @@
class SocketWriterAsync extends SocketWriter implements DataWriter {
private static final System.Logger LOGGER = System.getLogger(SocketWriterAsync.class.getName());
private static final BufferData CLOSING_TOKEN = BufferData.empty();

private final ExecutorService executor;
private final ArrayBlockingQueue<BufferData> writeQueue;
private final CountDownLatch cdl = new CountDownLatch(1);
private final AtomicBoolean started = new AtomicBoolean(false);
private volatile Throwable caught;
private volatile boolean run = true;
private Thread thread;
private double avgQueueSize;

/**
* A new socket writer.
Expand Down Expand Up @@ -116,14 +118,16 @@ private void run() {
CompositeBufferData toWrite = BufferData.createComposite(writeQueue.take()); // wait if the queue is empty
// we only want to read a certain amount of data, if somebody writes huge amounts
// we could spin here forever and run out of memory
for (int i = 0; i < 1000; i++) {
int queueSize = 1;
for (; queueSize <= 1000; queueSize++) {
BufferData newBuf = writeQueue.poll(); // drain ~all elements from the queue, don't wait.
if (newBuf == null) {
break;
}
toWrite.add(newBuf);
}
writeNow(toWrite);
avgQueueSize = (avgQueueSize + queueSize) / 2.0;
}
cdl.countDown();
} catch (Throwable e) {
Expand All @@ -141,4 +145,15 @@ private void checkRunning() {
throw new SocketWriterException(caught);
}
}

void drainQueue() {
BufferData buffer;
while ((buffer = writeQueue.poll()) != null) {
writeNow(buffer);
}
}

double avgQueueSize() {
return avgQueueSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ public final void run() {
}

reader = new DataReader(new MapExceptionDataSupplier(helidonSocket));
writer = SocketWriter.create(listenerContext.executor(), helidonSocket,
listenerContext.config().writeQueueLength());
writer = SocketWriter.create(listenerContext.executor(),
helidonSocket,
listenerConfig.writeQueueLength(),
listenerConfig.smartAsyncWrites());
} catch (Exception e) {
throw e instanceof RuntimeException re ? re : new RuntimeException(e); // see ServerListener
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,17 @@ interface ListenerConfigBlueprint {
@Option.DefaultInt(0)
int writeQueueLength();

/**
* If enabled and {@link #writeQueueLength()} is greater than 1, then
* start with async writes but possibly switch to sync writes if
* async queue size is always below a certain threshold.
*
* @return smart async setting
*/
@Option.Configured
@Option.DefaultBoolean(false)
boolean smartAsyncWrites();

/**
* Initial buffer size in bytes of {@link java.io.BufferedOutputStream} created internally to
* write data to a socket connection. Default is {@code 4096}.
Expand Down
Loading