Skip to content

Commit

Permalink
KAFKA-3424: Add CORS support to Connect REST API
Browse files Browse the repository at this point in the history
Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Gwen Shapira

Closes #1099 from ewencp/cors-rest-support
  • Loading branch information
ewencp authored and gwenshap committed Mar 20, 2016
1 parent 6553679 commit eb82328
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 1 deletion.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ project(':connect:runtime') {
compile libs.jerseyContainerServlet
compile libs.jettyServer
compile libs.jettyServlet
compile libs.jettyServlets
compile libs.reflections

testCompile project(':clients').sourceSets.test.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ public class WorkerConfig extends AbstractConfig {
private static final String REST_ADVERTISED_PORT_DOC
= "If this is set, this is the port that will be given out to other workers to connect to.";

public static final String ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG = "access.control.allow.origin";
protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DOC =
"Value to set the Access-Control-Allow-Origin header to for REST API requests." +
"To enable cross origin access, set this to the domain of the application that should be permitted" +
" to access the API, or '*' to allow access from any domain. The default value only allows access" +
" from the domain of the REST API.";
protected static final String ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT = "";


/**
* Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
* bootstrap their own ConfigDef.
Expand Down Expand Up @@ -129,7 +138,10 @@ protected static ConfigDef baseConfigDef() {
.define(REST_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_HOST_NAME_DOC)
.define(REST_PORT_CONFIG, Type.INT, REST_PORT_DEFAULT, Importance.LOW, REST_PORT_DOC)
.define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING, null, Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC)
.define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null, Importance.LOW, REST_ADVERTISED_PORT_DOC);
.define(REST_ADVERTISED_PORT_CONFIG, Type.INT, null, Importance.LOW, REST_ADVERTISED_PORT_DOC)
.define(ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, Type.STRING,
ACCESS_CONTROL_ALLOW_ORIGIN_DEFAULT, Importance.LOW,
ACCESS_CONTROL_ALLOW_ORIGIN_DOC);
}

public WorkerConfig(ConfigDef definition, Map<String, String> props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.CrossOriginFilter;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.slf4j.Logger;
Expand All @@ -52,9 +54,11 @@
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;

import javax.servlet.DispatcherType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder;

Expand Down Expand Up @@ -109,6 +113,14 @@ public void start(Herder herder) {
context.setContextPath("/");
context.addServlet(servletHolder, "/*");

String allowedOrigins = config.getString(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG);
if (allowedOrigins != null && !allowedOrigins.trim().isEmpty()) {
FilterHolder filterHolder = new FilterHolder(new CrossOriginFilter());
filterHolder.setName("cross-origin");
filterHolder.setInitParameter("allowedOrigins", allowedOrigins);
context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
}

RequestLogHandler requestLogHandler = new RequestLogHandler();
Slf4jRequestLog requestLog = new Slf4jRequestLog();
requestLog.setLoggerName(RestServer.class.getCanonicalName());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package org.apache.kafka.connect.runtime.rest;

import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.util.Callback;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.modules.junit4.PowerMockRunner;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import static org.junit.Assert.assertEquals;

@RunWith(PowerMockRunner.class)
public class RestServerTest {

@MockStrict
private Herder herder;
private RestServer server;

@After
public void tearDown() {
server.stop();
}

private Map<String, String> baseWorkerProps() {
Map<String, String> workerProps = new HashMap<>();
workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
workerProps.put("internal.key.converter.schemas.enable", "false");
workerProps.put("internal.value.converter.schemas.enable", "false");
workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
return workerProps;
}

@Test
public void testCORSEnabled() {
checkCORSRequest("*", "http://bar.com", "http://bar.com");
}

@Test
public void testCORSDisabled() {
checkCORSRequest("", "http://bar.com", null);
}

public void checkCORSRequest(String corsDomain, String origin, String expectedHeader) {
// To be able to set the Origin, we need to toggle this flag
System.setProperty("sun.net.http.allowRestrictedHeaders", "true");

final Capture<Callback<Collection<String>>> connectorsCallback = EasyMock.newCapture();
herder.connectors(EasyMock.capture(connectorsCallback));
PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
connectorsCallback.getValue().onCompletion(null, Arrays.asList("a", "b"));
return null;
}
});
PowerMock.replayAll();

Map<String, String> workerProps = baseWorkerProps();
workerProps.put(WorkerConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG, corsDomain);
WorkerConfig workerConfig = new StandaloneConfig(workerProps);
server = new RestServer(workerConfig);
server.start(herder);

Response response = request("/connectors")
.header("Referer", origin + "/page")
.header("Origin", origin)
.get();
assertEquals(200, response.getStatus());

assertEquals(expectedHeader, response.getHeaderString("Access-Control-Allow-Origin"));
PowerMock.verifyAll();
}

protected Invocation.Builder request(String path) {
return request(path, null, null, null);
}

protected Invocation.Builder request(String path, Map<String, String> queryParams) {
return request(path, null, null, queryParams);
}

protected Invocation.Builder request(String path, String templateName, Object templateValue) {
return request(path, templateName, templateValue, null);
}

protected Invocation.Builder request(String path, String templateName, Object templateValue,
Map<String, String> queryParams) {
Client client = ClientBuilder.newClient();
WebTarget target;
URI pathUri = null;
try {
pathUri = new URI(path);
} catch (URISyntaxException e) {
// Ignore, use restConnect and assume this is a valid path part
}
if (pathUri != null && pathUri.isAbsolute()) {
target = client.target(path);
} else {
target = client.target(server.advertisedUrl()).path(path);
}
if (templateName != null && templateValue != null) {
target = target.resolveTemplate(templateName, templateValue);
}
if (queryParams != null) {
for (Map.Entry<String, String> queryParam : queryParams.entrySet()) {
target = target.queryParam(queryParam.getKey(), queryParam.getValue());
}
}
return target.request();
}
}
1 change: 1 addition & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ libs += [
jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson",
jettyServer: "org.eclipse.jetty:jetty-server:$versions.jetty",
jettyServlet: "org.eclipse.jetty:jetty-servlet:$versions.jetty",
jettyServlets: "org.eclipse.jetty:jetty-servlets:$versions.jetty",
jerseyContainerServlet: "org.glassfish.jersey.containers:jersey-container-servlet:$versions.jersey",
junit: "junit:junit:$versions.junit",
joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
Expand Down

0 comments on commit eb82328

Please sign in to comment.