Skip to content

Commit

Permalink
Support Multipart with Buffered Entity and Netty Connector
Browse files Browse the repository at this point in the history
Signed-off-by: jansupol <jan.supol@oracle.com>
  • Loading branch information
jansupol committed Jul 3, 2024
1 parent ed233c5 commit 897d3a3
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@
import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
import org.glassfish.jersey.client.spi.Connector;
import org.glassfish.jersey.innate.VirtualThreadUtil;
import org.glassfish.jersey.internal.util.collection.LazyValue;
import org.glassfish.jersey.internal.util.collection.Value;
import org.glassfish.jersey.internal.util.collection.Values;
import org.glassfish.jersey.message.internal.OutboundMessageContext;
import org.glassfish.jersey.netty.connector.internal.NettyEntityWriter;

Expand All @@ -103,6 +106,17 @@ class NettyConnector implements Connector {
final Client client;
final HashMap<String, ArrayList<Channel>> connections = new HashMap<>();

private static final LazyValue<String> NETTY_VERSION = Values.lazy(
(Value<String>) () -> {
String nettyVersion = null;
try {
nettyVersion = io.netty.util.Version.identify().values().iterator().next().artifactVersion();
} catch (Throwable t) {
nettyVersion = "4.1.x";
}
return "Netty " + nettyVersion;
});

// If HTTP keepalive is enabled the value of "http.maxConnections" determines the maximum number
// of idle connections that will be simultaneously kept alive, per destination.
private static final String HTTP_KEEPALIVE_STRING = System.getProperty("http.keepAlive");
Expand Down Expand Up @@ -524,7 +538,7 @@ private String buildPathWithQueryParameters(URI requestUri) {

@Override
public String getName() {
return "Netty 4.1.x";
return NETTY_VERSION.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2023, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -24,8 +24,10 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* The Entity Writer is used to write entity in Netty. One implementation is delayed,
Expand Down Expand Up @@ -196,10 +198,7 @@ private void _flush() throws IOException {
for (Runnable runnable : delayedOps) {
runnable.run();
}

if (outputStream.b != null) {
writer.getOutputStream().write(outputStream.b, outputStream.off, outputStream.len);
}
outputStream._flush();
}
}

Expand All @@ -216,7 +215,7 @@ public OutputStream getOutputStream() {

@Override
public long getLength() {
return outputStream.len - outputStream.off;
return outputStream.writeLen;
}

@Override
Expand All @@ -225,9 +224,9 @@ public Type getType() {
}

private class DelayedOutputStream extends OutputStream {
private byte[] b;
private int off;
private int len;
private final List<WriteAction> actions = new ArrayList<>();
private int writeLen = 0;
private AtomicBoolean streamFlushed = new AtomicBoolean(false);

@Override
public void write(int b) throws IOException {
Expand All @@ -241,15 +240,39 @@ public void write(byte[] b) throws IOException {

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (!flushed && this.b == null) {
this.b = b;
this.off = off;
this.len = len;
if (!flushed) {
actions.add(new WriteAction(b, off, len));
writeLen += len;
} else {
DelayedEntityWriter.this._flush();
_flush();
writer.getOutputStream().write(b, off, len);
writer.getOutputStream().flush();
}
}

public void _flush() throws IOException {
if (streamFlushed.compareAndSet(false, true)) {
DelayedEntityWriter.this._flush();
for (WriteAction action : actions) {
action.run();
}
actions.clear();
}
}
}

private class WriteAction {
private final byte[] b;

private WriteAction(byte[] b, int off, int len) {
this.b = new byte[len]; // b passed in can be reused
System.arraycopy(b, off, this.b, 0, len);
}

public void run() throws IOException {
writer.getOutputStream().write(b, 0, b.length);
writer.getOutputStream().flush();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2023, 2024 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -16,11 +16,17 @@

package org.glassfish.jersey.tests.e2e.client.connector;

import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.HttpUrlConnectorProvider;
import org.glassfish.jersey.client.RequestEntityProcessing;
import org.glassfish.jersey.client.spi.ConnectorProvider;
import org.glassfish.jersey.jdk.connector.JdkConnectorProvider;
import org.glassfish.jersey.jetty.connector.JettyConnectorProvider;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.glassfish.jersey.netty.connector.NettyConnectorProvider;
import org.glassfish.jersey.logging.LoggingFeature;
import org.glassfish.jersey.media.multipart.BodyPart;
Expand All @@ -32,6 +38,8 @@
import org.glassfish.jersey.test.JerseyTest;
import org.glassfish.jersey.test.TestProperties;
import org.glassfish.jersey.test.spi.TestHelper;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DynamicContainer;
import org.junit.jupiter.api.Test;
Expand All @@ -40,18 +48,27 @@
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.Provider;
import javax.ws.rs.ext.WriterInterceptor;
import javax.ws.rs.ext.WriterInterceptorContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.Logger;

public class MultiPartTest {
Expand Down Expand Up @@ -129,5 +146,72 @@ public void testMultipart() {
}
}
}

@Test
public void testNettyBufferedMultipart() {
// setDebugLevel(Level.FINEST);
ClientConfig config = new ClientConfig();

config.connectorProvider(new NettyConnectorProvider());
config.property(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.BUFFERED);
config.register(org.glassfish.jersey.media.multipart.MultiPartFeature.class);
config.register(new LoggingHandler(LogLevel.DEBUG));
config.register(new LoggingInterceptor());
config.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 10);
config.property("jersey.config.client.logging.verbosity", LoggingFeature.Verbosity.PAYLOAD_TEXT);
config.property("jersey.config.client.logging.logger.level", Level.FINEST.toString());

Client client = ClientBuilder.newClient(config);

FormDataMultiPart formData = new FormDataMultiPart();
FormDataBodyPart bodyPart1 = new FormDataBodyPart("hello1", "{\"first\":\"firstLine\",\"second\":\"secondLine\"}",
MediaType.APPLICATION_JSON_TYPE);
formData.bodyPart(bodyPart1);
formData.bodyPart(new FormDataBodyPart("hello2",
"{\"first\":\"firstLine\",\"second\":\"secondLine\",\"third\":\"thirdLine\"}",
MediaType.APPLICATION_JSON_TYPE));
formData.bodyPart(new FormDataBodyPart("hello3",
"{\"first\":\"firstLine\",\"second\":\"secondLine\",\""
+ "second\":\"secondLine\",\"second\":\"secondLine\",\"second\":\"secondLine\"}",
MediaType.APPLICATION_JSON_TYPE));
formData.bodyPart(new FormDataBodyPart("plaintext", "hello"));

Response response1 = client.target(target().getUri()).path("upload")
.request()
.post(Entity.entity(formData, formData.getMediaType()));

MatcherAssert.assertThat(response1.getStatus(), Matchers.is(200));
MatcherAssert.assertThat(response1.readEntity(String.class),
Matchers.stringContainsInOrder("first", "firstLine", "second", "secondLine"));
response1.close();
client.close();
}

public static void setDebugLevel(Level newLvl) {
Logger rootLogger = LogManager.getLogManager().getLogger("");
Handler[] handlers = rootLogger.getHandlers();
rootLogger.setLevel(newLvl);
for (Handler h : handlers) {
h.setLevel(Level.ALL);
}
Logger nettyLogger = Logger.getLogger("io.netty");
nettyLogger.setLevel(Level.FINEST);
}

@Provider
public class LoggingInterceptor implements WriterInterceptor {

@Override
public void aroundWriteTo(WriterInterceptorContext context)
throws IOException, WebApplicationException {
try {
MultivaluedMap<String, Object> headers = context.getHeaders();
headers.forEach((key, val) -> System.out.println(key + ":" + val));
context.proceed();
} catch (Exception e) {
throw e;
}
}
}
}
}

0 comments on commit 897d3a3

Please sign in to comment.