Skip to content

Commit

Permalink
Support OpenSSL Provider with default Netty allocator
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed Dec 6, 2022
1 parent 913cb3a commit dcc6d72
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport;

import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.internal.SocketUtils;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.List;

public class Netty4NioServerSocketChannel extends NioServerSocketChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Netty4NioServerSocketChannel.class);

public Netty4NioServerSocketChannel() {
super();
}

public Netty4NioServerSocketChannel(SelectorProvider provider) {
super(provider);
}

public Netty4NioServerSocketChannel(SelectorProvider provider, InternetProtocolFamily family) {
super(provider, family);
}

public Netty4NioServerSocketChannel(ServerSocketChannel channel) {
super(channel);
}

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());

try {
if (ch != null) {
buf.add(new Netty4NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);

try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}

return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.Booleans;
Expand Down Expand Up @@ -181,7 +180,7 @@ public static Class<? extends ServerChannel> getServerChannelType() {
if (ALLOCATOR instanceof NoDirectBuffers) {
return CopyBytesServerSocketChannel.class;
} else {
return NioServerSocketChannel.class;
return Netty4NioServerSocketChannel.class;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,13 @@ static BytesReference fromByteBuffers(ByteBuffer[] buffers) {
* Returns BytesReference composed of the provided ByteBuffer.
*/
static BytesReference fromByteBuffer(ByteBuffer buffer) {
assert buffer.hasArray();
return new BytesArray(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
if (buffer.hasArray()) {
return new BytesArray(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
} else {
final byte[] array = new byte[buffer.remaining()];
buffer.asReadOnlyBuffer().get(array, buffer.position(), buffer.remaining());
return new BytesArray(array);
}
}

/**
Expand Down

0 comments on commit dcc6d72

Please sign in to comment.