Skip to content

Commit

Permalink
Deferred Executor
Browse files Browse the repository at this point in the history
* deferred API: global executor fix #488
* deferred API: local executor fix #489
  • Loading branch information
jknack committed Oct 2, 2016
1 parent 34150b0 commit 4828f3f
Show file tree
Hide file tree
Showing 20 changed files with 715 additions and 122 deletions.
41 changes: 41 additions & 0 deletions coverage-report/src/test/java/org/jooby/issues/Issue484.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.jooby.issues;

import static org.junit.Assert.assertEquals;

import org.jooby.Deferred;
import org.jooby.test.ServerFeature;
import org.junit.Test;

public class Issue484 extends ServerFeature {

{
get("/484", req -> {
String t1 = Thread.currentThread().getName();
return new Deferred(deferred -> {
deferred.resolve(t1 + ":" + Thread.currentThread().getName());
});
});

get("/484/promise", promise(deferred -> {
String t1 = Thread.currentThread().getName();
deferred.resolve(t1 + ":" + Thread.currentThread().getName());
}));
}

@Test
public void deferredOnDefaultExecutor() throws Exception {
request()
.get("/484")
.expect(rsp -> {
String[] threads = rsp.split(":");
assertEquals(threads[0], threads[1]);
});

request()
.get("/484/promise")
.expect(rsp -> {
String[] threads = rsp.split(":");
assertEquals(threads[0], threads[1]);
});
}
}
45 changes: 45 additions & 0 deletions coverage-report/src/test/java/org/jooby/issues/Issue484b.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.jooby.issues;

import static org.junit.Assert.assertNotEquals;

import java.util.concurrent.ForkJoinPool;

import org.jooby.Deferred;
import org.jooby.test.ServerFeature;
import org.junit.Test;

public class Issue484b extends ServerFeature {

{
executor(new ForkJoinPool());

get("/484", req -> {
return new Deferred(deferred -> {
deferred.resolve(deferred.callerThread() + ":" + Thread.currentThread().getName());
});
});

get("/484/promise", promise(deferred -> {
deferred.resolve(deferred.callerThread() + ":" + Thread.currentThread().getName());
}));
}

@Test
public void deferredWithExecutorInstance() throws Exception {
request()
.get("/484")
.expect(rsp -> {
System.out.println(rsp);
String[] threads = rsp.split(":");
assertNotEquals(threads[0], threads[1]);
});

request()
.get("/484/promise")
.expect(rsp -> {
System.out.println(rsp);
String[] threads = rsp.split(":");
assertNotEquals(threads[0], threads[1]);
});
}
}
54 changes: 54 additions & 0 deletions coverage-report/src/test/java/org/jooby/issues/Issue484c.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.jooby.issues;

import static org.junit.Assert.assertNotEquals;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import org.jooby.Deferred;
import org.jooby.test.ServerFeature;
import org.junit.Test;

import com.google.inject.Key;
import com.google.inject.name.Names;

public class Issue484c extends ServerFeature {

{
executor("ste");

use((env, conf, binder) -> {
binder.bind(Key.get(Executor.class, Names.named("ste")))
.toInstance(Executors.newSingleThreadExecutor());
});

get("/484", req -> {
return new Deferred(deferred -> {
deferred.resolve(deferred.callerThread() + ":" + Thread.currentThread().getName());
});
});

get("/484/promise", promise((req, deferred) -> {
deferred.resolve(deferred.callerThread() + ":" + Thread.currentThread().getName());
}));
}

@Test
public void deferredWithExecutorReference() throws Exception {
request()
.get("/484")
.expect(rsp -> {
System.out.println(rsp);
String[] threads = rsp.split(":");
assertNotEquals(threads[0], threads[1]);
});

request()
.get("/484/promise")
.expect(rsp -> {
System.out.println(rsp);
String[] threads = rsp.split(":");
assertNotEquals(threads[0], threads[1]);
});
}
}
77 changes: 77 additions & 0 deletions coverage-report/src/test/java/org/jooby/issues/Issue484d.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.jooby.issues;

import static org.junit.Assert.assertTrue;

import org.jooby.Deferred;
import org.jooby.exec.Exec;
import org.jooby.test.ServerFeature;
import org.junit.Test;

import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;

public class Issue484d extends ServerFeature {

{
use(ConfigFactory.empty()
.withValue("executors.fj", ConfigValueFactory.fromAnyRef("forkjoin = 2"))
.withValue("executors.cached", ConfigValueFactory.fromAnyRef("cached")));

executor("fj");

use(new Exec());

get("/484", req -> new Deferred(deferred -> {
deferred.resolve(Thread.currentThread().getName());
}));

get("/484/cached", req -> new Deferred("cached", deferred -> {
deferred.resolve(Thread.currentThread().getName());
}));

get("/484/fj", promise(deferred -> {
deferred.resolve(Thread.currentThread().getName());
}));

get("/484/local/cached", promise("cached", (req, deferred) -> {
deferred.resolve(Thread.currentThread().getName());
}));

get("/484/local/fj", promise("fj", deferred -> {
deferred.resolve(Thread.currentThread().getName());
}));
}

@Test
public void deferredOnGloablOrLocalExecutor() throws Exception {
request()
.get("/484")
.expect(rsp -> {
assertTrue(rsp.startsWith("forkjoin"));
});

request()
.get("/484/cached")
.expect(rsp -> {
assertTrue(rsp.startsWith("cached"));
});

request()
.get("/484/fj")
.expect(rsp -> {
assertTrue(rsp.startsWith("forkjoin"));
});

request()
.get("/484/local/cached")
.expect(rsp -> {
assertTrue(rsp.startsWith("cached"));
});

request()
.get("/484/local/fj")
.expect(rsp -> {
assertTrue(rsp.startsWith("forkjoin"));
});
}
}
41 changes: 41 additions & 0 deletions coverage-report/src/test/java/org/jooby/issues/Issue485.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.jooby.issues;

import static org.junit.Assert.assertTrue;

import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;

import org.jooby.test.ServerFeature;
import org.junit.Test;

public class Issue485 extends ServerFeature {

{
executor(new ForkJoinPool());
executor("cached", Executors.newCachedThreadPool());

get("/485/fj", promise(deferred -> {
deferred.resolve(Thread.currentThread().getName());
}));

get("/485/cached", promise("cached", deferred -> {
deferred.resolve(Thread.currentThread().getName());
}));

}

@Test
public void globalOrLocalExecutor() throws Exception {
request()
.get("/485/fj")
.expect(rsp -> {
assertTrue(rsp.toLowerCase().startsWith("forkjoinpool"));
});

request()
.get("/485/cached")
.expect(rsp -> {
assertTrue(rsp.toLowerCase().startsWith("pool"));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,28 +78,7 @@ public void handle(final String target, final Request baseRequest,
}

ServletServletRequest nreq = new ServletServletRequest(request, tmpdir, multipart)
.with(new ServletUpgrade() {

@SuppressWarnings("unchecked")
@Override
public <T> T upgrade(final Class<T> type) throws Exception {
if (type == NativeWebSocket.class
&& webSocketServerFactory.isUpgradeRequest(request, response)
&& webSocketServerFactory.acceptWebSocket(request, response)) {
String key = JettyWebSocket.class.getName();
NativeWebSocket ws = (NativeWebSocket) request.getAttribute(key);
if (ws != null) {
request.removeAttribute(key);
return (T) ws;
}
} else if (type == Sse.class) {
return (T) new JettySse(baseRequest, (Response) response);
} else if (type == NativePushPromise.class) {
return (T) new JettyPush(baseRequest);
}
throw new UnsupportedOperationException("Not Supported: " + type);
}
});
.with(upgrade(baseRequest, request, response, webSocketServerFactory));
dispatcher.handle(nreq, new JettyResponse(nreq, response));
} catch (IOException | ServletException | RuntimeException ex) {
baseRequest.setHandled(false);
Expand All @@ -112,4 +91,29 @@ public <T> T upgrade(final Class<T> type) throws Exception {
}
}

private static ServletUpgrade upgrade(final Request baseRequest, final HttpServletRequest request,
final HttpServletResponse response, final WebSocketServerFactory webSocketServerFactory) {
return new ServletUpgrade() {
@SuppressWarnings("unchecked")
@Override
public <T> T upgrade(final Class<T> type) throws Exception {
if (type == NativeWebSocket.class
&& webSocketServerFactory.isUpgradeRequest(request, response)
&& webSocketServerFactory.acceptWebSocket(request, response)) {
String key = JettyWebSocket.class.getName();
NativeWebSocket ws = (NativeWebSocket) request.getAttribute(key);
if (ws != null) {
request.removeAttribute(key);
return (T) ws;
}
} else if (type == Sse.class) {
return (T) new JettySse(baseRequest, (Response) response);
} else if (type == NativePushPromise.class) {
return (T) new JettyPush(baseRequest);
}
throw new UnsupportedOperationException("Not Supported: " + type);
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.server.HttpOutput;
Expand Down Expand Up @@ -60,7 +61,7 @@ public void send(final ByteBuffer buffer) throws Exception {
@Override
public void send(final InputStream stream) throws Exception {
endRequest = false;
nreq.startAsync();
startAsyncIfNeedIt();
sender().sendContent(Channels.newChannel(stream), this);
}

Expand All @@ -72,7 +73,7 @@ public void send(final FileChannel channel) throws Exception {
sender().sendContent(channel);
} else {
endRequest = false;
nreq.startAsync();
startAsyncIfNeedIt();
sender().sendContent(channel, this);
}
}
Expand Down Expand Up @@ -107,4 +108,10 @@ private HttpOutput sender() {
return ((Response) rsp).getHttpOutput();
}

private void startAsyncIfNeedIt() {
HttpServletRequest req = nreq.servletRequest();
if (!req.isAsyncStarted()) {
req.startAsync();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
Expand Down Expand Up @@ -116,9 +117,14 @@ private Server server(final HttpHandler handler, final Config conf,
return ws;
});

server.setHandler(new JettyHandler(handler, webSocketServerFactory, conf
ContextHandler sch = new ContextHandler();
// always '/' context path is internally handle by jooby
sch.setContextPath("/");
sch.setHandler(new JettyHandler(handler, webSocketServerFactory, conf
.getString("application.tmpdir"), conf.getBytes("jetty.FileSizeThreshold").intValue()));

server.setHandler(sch);

return server;
}

Expand Down
Loading

0 comments on commit 4828f3f

Please sign in to comment.