diff --git a/storm-webapp/pom.xml b/storm-webapp/pom.xml index 52e1eb4de98..9540358caf7 100644 --- a/storm-webapp/pom.xml +++ b/storm-webapp/pom.xml @@ -15,157 +15,159 @@ See the License for the specific language governing permissions and limitations under the License. --> - - 4.0.0 - - storm - org.apache.storm - 2.0.0-SNAPSHOT - .. - + + 4.0.0 + + storm + org.apache.storm + 2.0.0-SNAPSHOT + .. + - storm-webapp - jar - Storm Webapp - Webapp Servers for Apache Storm + storm-webapp + jar + Storm Webapp + Webapp Servers for Apache Storm - - - - org.apache.storm - storm-core - 2.0.0-SNAPSHOT - provided - - - com.googlecode.json-simple - json-simple - compile - - - log4j - log4j - 1.2.17 - test - - - com.google.guava - guava - compile - - - org.apache.logging.log4j - log4j-slf4j-impl - - - io.dropwizard.metrics - metrics-core - - - - junit - junit - test - - - org.mockito - mockito-all - test - - - javax.servlet - servlet-api - - - org.slf4j - slf4j-api - - - org.glassfish.jersey.core - jersey-server - - - org.glassfish.jersey.containers - jersey-container-servlet-core - - - org.glassfish.jersey.containers - jersey-container-jetty-http - - - - - - org.apache.maven.plugins - maven-surefire-report-plugin - - - ${project.build.directory}/test-reports - - - - - org.apache.maven.plugins - maven-failsafe-plugin - - ${project.build.directory}/test-reports - - - - org.apache.maven.plugins - maven-checkstyle-plugin - - - 290 - - - - maven-dependency-plugin - 2.8 - - - copy-dependencies - package - - copy-dependencies - - - false - false - true - runtime - - - - - - org.apache.maven.plugins - maven-source-plugin - 2.2.1 - - - attach-sources - - - jar-no-fork - - - - - - org.apache.maven.plugins - maven-jar-plugin - 2.6 - - - - test-jar - - - - - - + + + + org.apache.storm + storm-core + 2.0.0-SNAPSHOT + provided + + + com.googlecode.json-simple + json-simple + compile + + + log4j + log4j + 1.2.17 + test + + + com.google.guava + guava + compile + + + org.apache.logging.log4j + log4j-slf4j-impl + + + io.dropwizard.metrics + metrics-core + + + + junit + junit + test + + + org.mockito + mockito-all + test + + + javax.servlet + servlet-api + + + org.slf4j + slf4j-api + + + org.glassfish.jersey.core + jersey-server + + + org.glassfish.jersey.containers + jersey-container-servlet-core + + + org.glassfish.jersey.containers + jersey-container-jetty-http + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + + + ${project.build.directory}/test-reports + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + ${project.build.directory}/test-reports + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + 67 + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + false + false + true + runtime + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.2.1 + + + attach-sources + + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.6 + + + + test-jar + + + + + + diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java index 3e37ad7fd1d..5d41f5e71f1 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java @@ -17,10 +17,11 @@ */ package org.apache.storm.daemon.drpc; +import com.codahale.metrics.Meter; +import com.google.common.annotations.VisibleForTesting; import java.util.Arrays; import java.util.List; import java.util.Map; - import org.apache.storm.Config; import org.apache.storm.DaemonConfig; import org.apache.storm.daemon.drpc.webapp.DRPCApplication; @@ -45,170 +46,180 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.codahale.metrics.Meter; -import com.google.common.annotations.VisibleForTesting; - public class DRPCServer implements AutoCloseable { - private static final Logger LOG = LoggerFactory.getLogger(DRPCServer.class); - private final static Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls"); - - //TODO in the future this might be better in a common webapp location - public static void addRequestContextFilter(ServletContextHandler context, String configName, Map conf) { - IHttpCredentialsPlugin auth = AuthUtils.GetHttpCredentialsPlugin(conf, (String)conf.get(configName)); - ReqContextFilter filter = new ReqContextFilter(auth); - context.addFilter(new FilterHolder(filter), "/*", FilterMapping.ALL); - } - - private static ThriftServer mkHandlerServer(final DistributedRPC.Iface service, Integer port, Map conf) { - ThriftServer ret = null; - if (port != null && port >= 0) { - ret = new ThriftServer(conf, new DistributedRPC.Processor<>(service), - ThriftConnectionType.DRPC); - } - return ret; - } - private static ThriftServer mkInvokeServer(final DistributedRPCInvocations.Iface service, int port, Map conf) { - return new ThriftServer(conf, new DistributedRPCInvocations.Processor<>(service), - ThriftConnectionType.DRPC_INVOCATIONS); - } - - private static Server mkHttpServer(Map conf, DRPC drpc) { - Integer drpcHttpPort = (Integer) conf.get(DaemonConfig.DRPC_HTTP_PORT); - Server ret = null; - if (drpcHttpPort != null && drpcHttpPort >= 0) { - LOG.info("Starting RPC HTTP servers..."); - String filterClass = (String) (conf.get(DaemonConfig.DRPC_HTTP_FILTER)); - @SuppressWarnings("unchecked") - Map filterParams = (Map) (conf.get(DaemonConfig.DRPC_HTTP_FILTER_PARAMS)); - FilterConfiguration filterConfiguration = new FilterConfiguration(filterClass, filterParams); - final List filterConfigurations = Arrays.asList(filterConfiguration); - final Integer httpsPort = ObjectReader.getInt(conf.get(DaemonConfig.DRPC_HTTPS_PORT), 0); - final String httpsKsPath = (String) (conf.get(DaemonConfig.DRPC_HTTPS_KEYSTORE_PATH)); - final String httpsKsPassword = (String) (conf.get(DaemonConfig.DRPC_HTTPS_KEYSTORE_PASSWORD)); - final String httpsKsType = (String) (conf.get(DaemonConfig.DRPC_HTTPS_KEYSTORE_TYPE)); - final String httpsKeyPassword = (String) (conf.get(DaemonConfig.DRPC_HTTPS_KEY_PASSWORD)); - final String httpsTsPath = (String) (conf.get(DaemonConfig.DRPC_HTTPS_TRUSTSTORE_PATH)); - final String httpsTsPassword = (String) (conf.get(DaemonConfig.DRPC_HTTPS_TRUSTSTORE_PASSWORD)); - final String httpsTsType = (String) (conf.get(DaemonConfig.DRPC_HTTPS_TRUSTSTORE_TYPE)); - final Boolean httpsWantClientAuth = (Boolean) (conf.get(DaemonConfig.DRPC_HTTPS_WANT_CLIENT_AUTH)); - final Boolean httpsNeedClientAuth = (Boolean) (conf.get(DaemonConfig.DRPC_HTTPS_NEED_CLIENT_AUTH)); - - //TODO a better way to do this would be great. - DRPCApplication.setup(drpc); - ret = UIHelpers.jettyCreateServer(drpcHttpPort, null, httpsPort); - - UIHelpers.configSsl(ret, httpsPort, httpsKsPath, httpsKsPassword, httpsKsType, httpsKeyPassword, httpsTsPath, httpsTsPassword, httpsTsType, - httpsNeedClientAuth, httpsWantClientAuth); - - ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); - context.setContextPath("/"); - ret.setHandler(context); - - ServletHolder jerseyServlet = context.addServlet(ServletContainer.class, "/*"); - jerseyServlet.setInitOrder(1); - jerseyServlet.setInitParameter("javax.ws.rs.Application", DRPCApplication.class.getName()); - - UIHelpers.configFilters(context, filterConfigurations); - addRequestContextFilter(context, Config.DRPC_HTTP_CREDS_PLUGIN, conf); - } - return ret; - } - - private final DRPC _drpc; - private final ThriftServer _handlerServer; - private final ThriftServer _invokeServer; - private final Server _httpServer; - private Thread _handlerServerThread; - private boolean _closed = false; - - public DRPCServer(Map conf) { - _drpc = new DRPC(conf); - DRPCThrift thrift = new DRPCThrift(_drpc); - _handlerServer = mkHandlerServer(thrift, ObjectReader.getInt(conf.get(Config.DRPC_PORT), null), conf); - _invokeServer = mkInvokeServer(thrift, ObjectReader.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT), 3773), conf); - _httpServer = mkHttpServer(conf, _drpc); - } + private static final Logger LOG = LoggerFactory.getLogger(DRPCServer.class); + private final static Meter meterShutdownCalls = StormMetricsRegistry + .registerMeter("drpc:num-shutdown-calls"); + private final DRPC _drpc; + private final ThriftServer _handlerServer; + private final ThriftServer _invokeServer; + private final Server _httpServer; + private Thread _handlerServerThread; + private boolean _closed = false; + public DRPCServer(Map conf) { + _drpc = new DRPC(conf); + DRPCThrift thrift = new DRPCThrift(_drpc); + _handlerServer = mkHandlerServer(thrift, ObjectReader.getInt(conf.get(Config.DRPC_PORT), null), + conf); + _invokeServer = mkInvokeServer(thrift, + ObjectReader.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT), 3773), conf); + _httpServer = mkHttpServer(conf, _drpc); + } + + //TODO in the future this might be better in a common webapp location + public static void addRequestContextFilter(ServletContextHandler context, String configName, + Map conf) { + IHttpCredentialsPlugin auth = AuthUtils + .GetHttpCredentialsPlugin(conf, (String) conf.get(configName)); + ReqContextFilter filter = new ReqContextFilter(auth); + context.addFilter(new FilterHolder(filter), "/*", FilterMapping.ALL); + } - @VisibleForTesting - void start() throws Exception { - LOG.info("Starting Distributed RPC servers..."); - new Thread(() -> _invokeServer.serve()).start(); - - if (_httpServer != null) { - _httpServer.start(); - } - - if (_handlerServer != null) { - _handlerServerThread = new Thread(_handlerServer::serve); - _handlerServerThread.start(); - } + private static ThriftServer mkHandlerServer(final DistributedRPC.Iface service, Integer port, + Map conf) { + ThriftServer ret = null; + if (port != null && port >= 0) { + ret = new ThriftServer(conf, new DistributedRPC.Processor<>(service), + ThriftConnectionType.DRPC); } - - @VisibleForTesting - void awaitTermination() throws InterruptedException { - if(_handlerServerThread != null) { - _handlerServerThread.join(); - } else { - _httpServer.join(); - } + return ret; + } + + private static ThriftServer mkInvokeServer(final DistributedRPCInvocations.Iface service, + int port, Map conf) { + return new ThriftServer(conf, new DistributedRPCInvocations.Processor<>(service), + ThriftConnectionType.DRPC_INVOCATIONS); + } + + private static Server mkHttpServer(Map conf, DRPC drpc) { + Integer drpcHttpPort = (Integer) conf.get(DaemonConfig.DRPC_HTTP_PORT); + Server ret = null; + if (drpcHttpPort != null && drpcHttpPort >= 0) { + LOG.info("Starting RPC HTTP servers..."); + String filterClass = (String) (conf.get(DaemonConfig.DRPC_HTTP_FILTER)); + @SuppressWarnings("unchecked") + Map filterParams = (Map) (conf + .get(DaemonConfig.DRPC_HTTP_FILTER_PARAMS)); + FilterConfiguration filterConfiguration = new FilterConfiguration(filterClass, filterParams); + final List filterConfigurations = Arrays.asList(filterConfiguration); + final Integer httpsPort = ObjectReader.getInt(conf.get(DaemonConfig.DRPC_HTTPS_PORT), 0); + final String httpsKsPath = (String) (conf.get(DaemonConfig.DRPC_HTTPS_KEYSTORE_PATH)); + final String httpsKsPassword = (String) (conf.get(DaemonConfig.DRPC_HTTPS_KEYSTORE_PASSWORD)); + final String httpsKsType = (String) (conf.get(DaemonConfig.DRPC_HTTPS_KEYSTORE_TYPE)); + final String httpsKeyPassword = (String) (conf.get(DaemonConfig.DRPC_HTTPS_KEY_PASSWORD)); + final String httpsTsPath = (String) (conf.get(DaemonConfig.DRPC_HTTPS_TRUSTSTORE_PATH)); + final String httpsTsPassword = (String) (conf + .get(DaemonConfig.DRPC_HTTPS_TRUSTSTORE_PASSWORD)); + final String httpsTsType = (String) (conf.get(DaemonConfig.DRPC_HTTPS_TRUSTSTORE_TYPE)); + final Boolean httpsWantClientAuth = (Boolean) (conf + .get(DaemonConfig.DRPC_HTTPS_WANT_CLIENT_AUTH)); + final Boolean httpsNeedClientAuth = (Boolean) (conf + .get(DaemonConfig.DRPC_HTTPS_NEED_CLIENT_AUTH)); + + //TODO a better way to do this would be great. + DRPCApplication.setup(drpc); + ret = UIHelpers.jettyCreateServer(drpcHttpPort, null, httpsPort); + + UIHelpers + .configSsl(ret, httpsPort, httpsKsPath, httpsKsPassword, httpsKsType, httpsKeyPassword, + httpsTsPath, httpsTsPassword, httpsTsType, + httpsNeedClientAuth, httpsWantClientAuth); + + ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + context.setContextPath("/"); + ret.setHandler(context); + + ServletHolder jerseyServlet = context.addServlet(ServletContainer.class, "/*"); + jerseyServlet.setInitOrder(1); + jerseyServlet.setInitParameter("javax.ws.rs.Application", DRPCApplication.class.getName()); + + UIHelpers.configFilters(context, filterConfigurations); + addRequestContextFilter(context, Config.DRPC_HTTP_CREDS_PLUGIN, conf); } + return ret; + } - @Override - public synchronized void close() { - if (!_closed) { - //This is kind of useless... - meterShutdownCalls.mark(); - - if (_handlerServer != null) { - _handlerServer.stop(); - } - - if (_invokeServer != null) { - _invokeServer.stop(); - } - - //TODO this is causing issues... - //if (_httpServer != null) { - // _httpServer.destroy(); - //} - - _drpc.close(); - _closed = true; - } + public static void main(String[] args) throws Exception { + Utils.setupDefaultUncaughtExceptionHandler(); + Map conf = Utils.readStormConfig(); + try (DRPCServer server = new DRPCServer(conf)) { + Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close()); + StormMetricsRegistry.startMetricsReporters(conf); + server.start(); + server.awaitTermination(); } - - /** - * @return The port the DRPC handler server is listening on - */ - public int getDRPCPort() { - return _handlerServer.getPort(); + } + + @VisibleForTesting + void start() throws Exception { + LOG.info("Starting Distributed RPC servers..."); + new Thread(() -> _invokeServer.serve()).start(); + + if (_httpServer != null) { + _httpServer.start(); } - - /** - * @return The port the DRPC invoke server is listening on - */ - public int getDRPCInvokePort() { - return _invokeServer.getPort(); + + if (_handlerServer != null) { + _handlerServerThread = new Thread(_handlerServer::serve); + _handlerServerThread.start(); } - - /** - * @return The port the HTTP server is listening on. Not available until {@link #start() } has run. - */ - public int getHttpServerPort() { - assert _httpServer.getConnectors().length == 1; - - return _httpServer.getConnectors()[0].getLocalPort(); + } + + @VisibleForTesting + void awaitTermination() throws InterruptedException { + if (_handlerServerThread != null) { + _handlerServerThread.join(); + } else { + _httpServer.join(); } - - public static void main(String [] args) throws Exception { - Utils.setupDefaultUncaughtExceptionHandler(); - Map conf = Utils.readStormConfig(); - try (DRPCServer server = new DRPCServer(conf)) { - Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close()); - StormMetricsRegistry.startMetricsReporters(conf); - server.start(); - server.awaitTermination(); - } + } + + @Override + public synchronized void close() { + if (!_closed) { + //This is kind of useless... + meterShutdownCalls.mark(); + + if (_handlerServer != null) { + _handlerServer.stop(); + } + + if (_invokeServer != null) { + _invokeServer.stop(); + } + + //TODO this is causing issues... + //if (_httpServer != null) { + // _httpServer.destroy(); + //} + + _drpc.close(); + _closed = true; } + } + + /** + * @return The port the DRPC handler server is listening on + */ + public int getDRPCPort() { + return _handlerServer.getPort(); + } + + /** + * @return The port the DRPC invoke server is listening on + */ + public int getDRPCInvokePort() { + return _invokeServer.getPort(); + } + + /** + * @return The port the HTTP server is listening on. Not available until {@link #start() } has + * run. + */ + public int getHttpServerPort() { + assert _httpServer.getConnectors().length == 1; + + return _httpServer.getConnectors()[0].getLocalPort(); + } } diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java index ef1e4293d9f..f6a5dce5c37 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/AuthorizationExceptionMapper.java @@ -19,21 +19,21 @@ import java.util.HashMap; import java.util.Map; - import javax.ws.rs.core.Response; import javax.ws.rs.ext.ExceptionMapper; import javax.ws.rs.ext.Provider; - import org.apache.storm.generated.AuthorizationException; import org.json.simple.JSONValue; @Provider public class AuthorizationExceptionMapper implements ExceptionMapper { - @Override - public Response toResponse(AuthorizationException ex) { - Map body = new HashMap<>(); - body.put("error", "Not Authorized"); - body.put("errorMessage", ex.get_msg()); - return Response.status(403).entity(JSONValue.toJSONString(body)).type("application/json").build(); - } + + @Override + public Response toResponse(AuthorizationException ex) { + Map body = new HashMap<>(); + body.put("error", "Not Authorized"); + body.put("errorMessage", ex.get_msg()); + return Response.status(403).entity(JSONValue.toJSONString(body)).type("application/json") + .build(); + } } diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java index da77a49b907..cc9a4875d15 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCApplication.java @@ -17,31 +17,30 @@ */ package org.apache.storm.daemon.drpc.webapp; -import org.apache.storm.daemon.drpc.DRPC; - import java.util.HashSet; import java.util.Set; - import javax.ws.rs.ApplicationPath; import javax.ws.rs.core.Application; +import org.apache.storm.daemon.drpc.DRPC; @ApplicationPath("") public class DRPCApplication extends Application { - private static DRPC _drpc; - private final Set singletons = new HashSet(); - - public DRPCApplication() { - singletons.add(new DRPCResource(_drpc)); - singletons.add(new DRPCExceptionMapper()); - singletons.add(new AuthorizationExceptionMapper()); - } - - @Override - public Set getSingletons() { - return singletons; - } - public static void setup(DRPC drpc) { - _drpc = drpc; - } + private static DRPC _drpc; + private final Set singletons = new HashSet(); + + public DRPCApplication() { + singletons.add(new DRPCResource(_drpc)); + singletons.add(new DRPCExceptionMapper()); + singletons.add(new AuthorizationExceptionMapper()); + } + + public static void setup(DRPC drpc) { + _drpc = drpc; + } + + @Override + public Set getSingletons() { + return singletons; + } } \ No newline at end of file diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java index 34b1f8da988..c5437a2698e 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCExceptionMapper.java @@ -19,43 +19,41 @@ import java.util.HashMap; import java.util.Map; - import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.ext.ExceptionMapper; import javax.ws.rs.ext.Provider; - import org.apache.storm.generated.DRPCExecutionException; import org.json.simple.JSONValue; @Provider public class DRPCExceptionMapper implements ExceptionMapper { - @Override - public Response toResponse(DRPCExecutionException ex) { - ResponseBuilder builder = Response.status(500); - switch (ex.get_type()) { - case FAILED_REQUEST: - builder.status(400); - break; - case SERVER_SHUTDOWN: - builder.status(503); //Not available - break; - case SERVER_TIMEOUT: - builder.status(504); //proxy timeout - break; - case INTERNAL_ERROR: - //fall throw on purpose - default: - //Empty (Still 500) - break; - - } - Map body = new HashMap<>(); - //TODO I would love to standardize this... - body.put("error", ex.is_set_type() ? ex.get_type().toString() : "Internal Error"); - body.put("errorMessage", ex.get_msg()); - return builder.entity(JSONValue.toJSONString(body)).type("application/json").build(); + @Override + public Response toResponse(DRPCExecutionException ex) { + ResponseBuilder builder = Response.status(500); + switch (ex.get_type()) { + case FAILED_REQUEST: + builder.status(400); + break; + case SERVER_SHUTDOWN: + builder.status(503); //Not available + break; + case SERVER_TIMEOUT: + builder.status(504); //proxy timeout + break; + case INTERNAL_ERROR: + //fall throw on purpose + default: + //Empty (Still 500) + break; + } + Map body = new HashMap<>(); + //TODO I would love to standardize this... + body.put("error", ex.is_set_type() ? ex.get_type().toString() : "Internal Error"); + body.put("errorMessage", ex.get_msg()); + return builder.entity(JSONValue.toJSONString(body)).type("application/json").build(); + } } diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java index 37088674f95..2e0c824e78b 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java @@ -17,47 +17,51 @@ */ package org.apache.storm.daemon.drpc.webapp; +import com.codahale.metrics.Meter; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.core.Context; - import org.apache.storm.daemon.drpc.DRPC; import org.apache.storm.metric.StormMetricsRegistry; import org.apache.thrift.TException; -import com.codahale.metrics.Meter; - @Path("/drpc/") public class DRPCResource { - private static final Meter meterHttpRequests = StormMetricsRegistry.registerMeter("drpc:num-execute-http-requests"); - private final DRPC _drpc; - public DRPCResource(DRPC drpc) { - _drpc = drpc; - } - - //TODO put in some better exception mapping... - //TODO move populateContext to a filter... - @POST - @Path("/{func}") - public String post(@PathParam("func") String func, String args, @Context HttpServletRequest request) throws TException { - meterHttpRequests.mark(); - return _drpc.executeBlocking(func, args); - } - - @GET - @Path("/{func}/{args}") - public String get(@PathParam("func") String func, @PathParam("args") String args, @Context HttpServletRequest request) throws TException { - meterHttpRequests.mark(); - return _drpc.executeBlocking(func, args); - } - - @GET - @Path("/{func}") - public String get(@PathParam("func") String func, @Context HttpServletRequest request) throws TException { - meterHttpRequests.mark(); - return _drpc.executeBlocking(func, ""); - } + + private static final Meter meterHttpRequests = StormMetricsRegistry + .registerMeter("drpc:num-execute-http-requests"); + private final DRPC _drpc; + + public DRPCResource(DRPC drpc) { + _drpc = drpc; + } + + //TODO put in some better exception mapping... + //TODO move populateContext to a filter... + @POST + @Path("/{func}") + public String post(@PathParam("func") String func, String args, + @Context HttpServletRequest request) throws TException { + meterHttpRequests.mark(); + return _drpc.executeBlocking(func, args); + } + + @GET + @Path("/{func}/{args}") + public String get(@PathParam("func") String func, @PathParam("args") String args, + @Context HttpServletRequest request) throws TException { + meterHttpRequests.mark(); + return _drpc.executeBlocking(func, args); + } + + @GET + @Path("/{func}") + public String get(@PathParam("func") String func, @Context HttpServletRequest request) + throws TException { + meterHttpRequests.mark(); + return _drpc.executeBlocking(func, ""); + } } diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java index 8dcd098ad33..e3ffe69c047 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/ReqContextFilter.java @@ -16,8 +16,8 @@ * limitations under the License. */ package org.apache.storm.daemon.drpc.webapp; -import java.io.IOException; +import java.io.IOException; import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; @@ -26,44 +26,48 @@ import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; - import org.apache.storm.security.auth.IHttpCredentialsPlugin; import org.apache.storm.security.auth.ReqContext; public class ReqContextFilter implements Filter { - private final IHttpCredentialsPlugin _httpCredsHandler; - public ReqContextFilter(IHttpCredentialsPlugin httpCredsHandler) { - _httpCredsHandler = httpCredsHandler; - } - - /** - * Populate the Storm RequestContext from an servlet request. This should be called in each handler - * @param request the request to populate - */ - public void populateContext(HttpServletRequest request) { - if (_httpCredsHandler != null) { - _httpCredsHandler.populateContext(ReqContext.context(), request); - } - } - - public void init(FilterConfig config) throws ServletException { - //NOOP - //We could add in configs through the web.xml if we wanted something stand alone here... - } + private final IHttpCredentialsPlugin _httpCredsHandler; - public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { - handle((HttpServletRequest)request, (HttpServletResponse)response, chain); - } + public ReqContextFilter(IHttpCredentialsPlugin httpCredsHandler) { + _httpCredsHandler = httpCredsHandler; + } - public void handle(HttpServletRequest request, HttpServletResponse response, FilterChain chain) throws IOException, ServletException{ - if (request != null) { - populateContext(request); - } - chain.doFilter(request, response); + /** + * Populate the Storm RequestContext from an servlet request. This should be called in each + * handler + * + * @param request the request to populate + */ + public void populateContext(HttpServletRequest request) { + if (_httpCredsHandler != null) { + _httpCredsHandler.populateContext(ReqContext.context(), request); } + } + + public void init(FilterConfig config) throws ServletException { + //NOOP + //We could add in configs through the web.xml if we wanted something stand alone here... + } - public void destroy() { - //NOOP + public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) + throws IOException, ServletException { + handle((HttpServletRequest) request, (HttpServletResponse) response, chain); + } + + public void handle(HttpServletRequest request, HttpServletResponse response, FilterChain chain) + throws IOException, ServletException { + if (request != null) { + populateContext(request); } + chain.doFilter(request, response); + } + + public void destroy() { + //NOOP + } } diff --git a/storm-webapp/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java b/storm-webapp/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java index 76652e29b45..fe6aa62d03c 100644 --- a/storm-webapp/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java +++ b/storm-webapp/src/test/java/org/apache/storm/daemon/drpc/DRPCServerTest.java @@ -30,7 +30,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; - import org.apache.storm.Config; import org.apache.storm.DaemonConfig; import org.apache.storm.drpc.DRPCInvocationsClient; @@ -45,147 +44,156 @@ public class DRPCServerTest { - private static final Logger LOG = LoggerFactory.getLogger(DRPCServerTest.class); - private static final ExecutorService exec = Executors.newCachedThreadPool(); - - @AfterClass - public static void close() { - exec.shutdownNow(); - } - - private static DRPCRequest getNextAvailableRequest(DRPCInvocationsClient invoke, String func) throws Exception { - DRPCRequest request = null; - long timedout = System.currentTimeMillis() + 5_000; - while (System.currentTimeMillis() < timedout) { - request = invoke.getClient().fetchRequest(func); - if (request != null && request.get_request_id() != null && !request.get_request_id().isEmpty()) { - return request; - } - Thread.sleep(1); - } - fail("Test timed out waiting for a request on " + func); + + private static final Logger LOG = LoggerFactory.getLogger(DRPCServerTest.class); + private static final ExecutorService exec = Executors.newCachedThreadPool(); + + @AfterClass + public static void close() { + exec.shutdownNow(); + } + + private static DRPCRequest getNextAvailableRequest(DRPCInvocationsClient invoke, String func) + throws Exception { + DRPCRequest request = null; + long timedout = System.currentTimeMillis() + 5_000; + while (System.currentTimeMillis() < timedout) { + request = invoke.getClient().fetchRequest(func); + if (request != null && request.get_request_id() != null && !request.get_request_id() + .isEmpty()) { return request; + } + Thread.sleep(1); } - - private Map getConf(int drpcPort, int invocationsPort, Integer httpPort) { - Map conf = new HashMap<>(); - conf.put(Config.DRPC_PORT, drpcPort); - conf.put(Config.DRPC_INVOCATIONS_PORT, invocationsPort); - conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, SimpleTransportPlugin.class.getName()); - conf.put(Config.DRPC_WORKER_THREADS, 5); - conf.put(Config.DRPC_INVOCATIONS_THREADS, 5); - conf.put(Config.DRPC_MAX_BUFFER_SIZE, 1048576); - conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 2); - conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10); - conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 100); - if (httpPort != null) { - conf.put(DaemonConfig.DRPC_HTTP_PORT, httpPort); - } - return conf; + fail("Test timed out waiting for a request on " + func); + return request; + } + + public static String GET(int port, String func, String args) { + try { + URL url = new URL("http://localhost:" + port + "/drpc/" + func + "/" + args); + InputStream in = url.openStream(); + byte[] buffer = new byte[1024]; + int read = in.read(buffer); + return new String(buffer, 0, read); + } catch (Exception e) { + throw new RuntimeException(e); } - - @Test - public void testGoodThrift() throws Exception { - Map conf = getConf(0, 0, null); - try (DRPCServer server = new DRPCServer(conf)) { - server.start(); - try (DRPCClient client = new DRPCClient(conf, "localhost", server.getDRPCPort()); - DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", server.getDRPCInvokePort())) { - Future found = exec.submit(() -> client.getClient().execute("testing", "test")); - DRPCRequest request = getNextAvailableRequest(invoke, "testing"); - assertNotNull(request); - assertEquals("test", request.get_func_args()); - assertNotNull(request.get_request_id()); - invoke.result(request.get_request_id(), "tested"); - String result = found.get(1000, TimeUnit.MILLISECONDS); - assertEquals("tested", result); - } - } + } + + private Map getConf(int drpcPort, int invocationsPort, Integer httpPort) { + Map conf = new HashMap<>(); + conf.put(Config.DRPC_PORT, drpcPort); + conf.put(Config.DRPC_INVOCATIONS_PORT, invocationsPort); + conf.put(Config.STORM_THRIFT_TRANSPORT_PLUGIN, SimpleTransportPlugin.class.getName()); + conf.put(Config.DRPC_WORKER_THREADS, 5); + conf.put(Config.DRPC_INVOCATIONS_THREADS, 5); + conf.put(Config.DRPC_MAX_BUFFER_SIZE, 1048576); + conf.put(Config.STORM_NIMBUS_RETRY_TIMES, 2); + conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL, 10); + conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING, 100); + if (httpPort != null) { + conf.put(DaemonConfig.DRPC_HTTP_PORT, httpPort); } - - @Test - public void testFailedThrift() throws Exception { - Map conf = getConf(0, 0, null); - try (DRPCServer server = new DRPCServer(conf)) { - server.start(); - try (DRPCClient client = new DRPCClient(conf, "localhost", server.getDRPCPort()); - DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", server.getDRPCInvokePort())) { - Future found = exec.submit(() -> client.getClient().execute("testing", "test")); - DRPCRequest request = getNextAvailableRequest(invoke, "testing"); - assertNotNull(request); - assertEquals("test", request.get_func_args()); - assertNotNull(request.get_request_id()); - invoke.failRequest(request.get_request_id()); - try { - found.get(1000, TimeUnit.MILLISECONDS); - fail("exec did not throw an exception"); - } catch (ExecutionException e) { - Throwable t = e.getCause(); - assertEquals(t.getClass(), DRPCExecutionException.class); - //Don't know a better way to validate that it failed. - assertEquals("Request failed", ((DRPCExecutionException)t).get_msg()); - } - } - } + return conf; + } + + @Test + public void testGoodThrift() throws Exception { + Map conf = getConf(0, 0, null); + try (DRPCServer server = new DRPCServer(conf)) { + server.start(); + try (DRPCClient client = new DRPCClient(conf, "localhost", server.getDRPCPort()); + DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", + server.getDRPCInvokePort())) { + Future found = exec.submit(() -> client.getClient().execute("testing", "test")); + DRPCRequest request = getNextAvailableRequest(invoke, "testing"); + assertNotNull(request); + assertEquals("test", request.get_func_args()); + assertNotNull(request.get_request_id()); + invoke.result(request.get_request_id(), "tested"); + String result = found.get(1000, TimeUnit.MILLISECONDS); + assertEquals("tested", result); + } } - - public static String GET(int port, String func, String args) { + } + + @Test + public void testFailedThrift() throws Exception { + Map conf = getConf(0, 0, null); + try (DRPCServer server = new DRPCServer(conf)) { + server.start(); + try (DRPCClient client = new DRPCClient(conf, "localhost", server.getDRPCPort()); + DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", + server.getDRPCInvokePort())) { + Future found = exec.submit(() -> client.getClient().execute("testing", "test")); + DRPCRequest request = getNextAvailableRequest(invoke, "testing"); + assertNotNull(request); + assertEquals("test", request.get_func_args()); + assertNotNull(request.get_request_id()); + invoke.failRequest(request.get_request_id()); try { - URL url = new URL("http://localhost:"+port+"/drpc/"+func+"/"+args); - InputStream in = url.openStream(); - byte[] buffer = new byte[1024]; - int read = in.read(buffer); - return new String(buffer, 0, read); - } catch (Exception e) { - throw new RuntimeException(e); + found.get(1000, TimeUnit.MILLISECONDS); + fail("exec did not throw an exception"); + } catch (ExecutionException e) { + Throwable t = e.getCause(); + assertEquals(t.getClass(), DRPCExecutionException.class); + //Don't know a better way to validate that it failed. + assertEquals("Request failed", ((DRPCExecutionException) t).get_msg()); } + } } - - @Test - public void testGoodHttpGet() throws Exception { - LOG.info("STARTING HTTP GET TEST..."); - Map conf = getConf(0, 0, 0); - try (DRPCServer server = new DRPCServer(conf)) { - server.start(); - //TODO need a better way to do this - Thread.sleep(2000); - try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", server.getDRPCInvokePort())) { - Future found = exec.submit(() -> GET(server.getHttpServerPort(), "testing", "test")); - DRPCRequest request = getNextAvailableRequest(invoke, "testing"); - assertNotNull(request); - assertEquals("test", request.get_func_args()); - assertNotNull(request.get_request_id()); - invoke.result(request.get_request_id(), "tested"); - String result = found.get(1000, TimeUnit.MILLISECONDS); - assertEquals("tested", result); - } - } + } + + @Test + public void testGoodHttpGet() throws Exception { + LOG.info("STARTING HTTP GET TEST..."); + Map conf = getConf(0, 0, 0); + try (DRPCServer server = new DRPCServer(conf)) { + server.start(); + //TODO need a better way to do this + Thread.sleep(2000); + try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", + server.getDRPCInvokePort())) { + Future found = exec + .submit(() -> GET(server.getHttpServerPort(), "testing", "test")); + DRPCRequest request = getNextAvailableRequest(invoke, "testing"); + assertNotNull(request); + assertEquals("test", request.get_func_args()); + assertNotNull(request.get_request_id()); + invoke.result(request.get_request_id(), "tested"); + String result = found.get(1000, TimeUnit.MILLISECONDS); + assertEquals("tested", result); + } } - - @Test - public void testFailedHttpGet() throws Exception { - LOG.info("STARTING HTTP GET (FAIL) TEST..."); - Map conf = getConf(0, 0, 0); - try (DRPCServer server = new DRPCServer(conf)) { - server.start(); - //TODO need a better way to do this - Thread.sleep(2000); - try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", server.getDRPCInvokePort())) { - Future found = exec.submit(() -> GET(server.getHttpServerPort(), "testing", "test")); - DRPCRequest request = getNextAvailableRequest(invoke, "testing"); - assertNotNull(request); - assertEquals("test", request.get_func_args()); - assertNotNull(request.get_request_id()); - invoke.getClient().failRequest(request.get_request_id()); - try { - found.get(1000, TimeUnit.MILLISECONDS); - fail("exec did not throw an exception"); - } catch (ExecutionException e) { - LOG.warn("Got Expected Exception", e); - //Getting the exact response code is a bit more complex. - //TODO should use a better client - } - } + } + + @Test + public void testFailedHttpGet() throws Exception { + LOG.info("STARTING HTTP GET (FAIL) TEST..."); + Map conf = getConf(0, 0, 0); + try (DRPCServer server = new DRPCServer(conf)) { + server.start(); + //TODO need a better way to do this + Thread.sleep(2000); + try (DRPCInvocationsClient invoke = new DRPCInvocationsClient(conf, "localhost", + server.getDRPCInvokePort())) { + Future found = exec + .submit(() -> GET(server.getHttpServerPort(), "testing", "test")); + DRPCRequest request = getNextAvailableRequest(invoke, "testing"); + assertNotNull(request); + assertEquals("test", request.get_func_args()); + assertNotNull(request.get_request_id()); + invoke.getClient().failRequest(request.get_request_id()); + try { + found.get(1000, TimeUnit.MILLISECONDS); + fail("exec did not throw an exception"); + } catch (ExecutionException e) { + LOG.warn("Got Expected Exception", e); + //Getting the exact response code is a bit more complex. + //TODO should use a better client } + } } + } }