Skip to content

Commit c792ef8

Browse files
authored
[PIP 95][Issue 12040][web] Topic lookup with listener header (#12072)
Master Issue: #12040 ### Motivation This PR introduces an optional HTTP header `X-Pulsar-ListenerName` to the `TopicLookup` operation of the Pulsar REST API. The header supplies the listener name to use when the broker has multiple advertised listeners. Today the `TopicLookup` operation accepts a query parameter `listenerName` for that same purpose. The motivation for adding a header-based alternative is to improve support smart listener selection via HTTP gateways or proxies that are capable of rewriting headers (see: [Istio VirtualService](https://istio.io/latest/docs/reference/config/networking/virtual-service/#Headers)). See #12040 for more background. ### Modifications - Modify `TopicLookup` operation (V1 + V2) to use a header (query string takes precedence).
1 parent 64c4b14 commit c792ef8

File tree

6 files changed

+223
-6
lines changed

6 files changed

+223
-6
lines changed

pulsar-broker/pom.xml

+14
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,20 @@
207207
<artifactId>jersey-media-json-jackson</artifactId>
208208
</dependency>
209209

210+
<dependency>
211+
<groupId>org.glassfish.jersey.test-framework</groupId>
212+
<artifactId>jersey-test-framework-core</artifactId>
213+
<scope>test</scope>
214+
<version>${jersey.version}</version>
215+
</dependency>
216+
217+
<dependency>
218+
<groupId>org.glassfish.jersey.test-framework.providers</groupId>
219+
<artifactId>jersey-test-framework-provider-grizzly2</artifactId>
220+
<scope>test</scope>
221+
<version>${jersey.version}</version>
222+
</dependency>
223+
210224
<dependency>
211225
<groupId>jakarta.activation</groupId>
212226
<artifactId>jakarta.activation-api</artifactId>

pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v1/TopicLookup.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@
2323
import javax.ws.rs.DefaultValue;
2424
import javax.ws.rs.Encoded;
2525
import javax.ws.rs.GET;
26+
import javax.ws.rs.HeaderParam;
2627
import javax.ws.rs.Path;
2728
import javax.ws.rs.PathParam;
2829
import javax.ws.rs.Produces;
2930
import javax.ws.rs.QueryParam;
3031
import javax.ws.rs.container.AsyncResponse;
3132
import javax.ws.rs.container.Suspended;
3233
import javax.ws.rs.core.MediaType;
34+
import org.apache.commons.lang3.StringUtils;
3335
import org.apache.pulsar.broker.lookup.TopicLookupBase;
3436
import org.apache.pulsar.broker.web.NoSwaggerDocumentation;
3537
import org.apache.pulsar.common.naming.TopicName;
@@ -48,6 +50,8 @@
4850
@NoSwaggerDocumentation
4951
public class TopicLookup extends TopicLookupBase {
5052

53+
static final String LISTENERNAME_HEADER = "X-Pulsar-ListenerName";
54+
5155
@GET
5256
@Path("{topic-domain}/{property}/{cluster}/{namespace}/{topic}")
5357
@Produces(MediaType.APPLICATION_JSON)
@@ -58,8 +62,12 @@ public void lookupTopicAsync(@PathParam("topic-domain") String topicDomain, @Pat
5862
@PathParam("topic") @Encoded String encodedTopic,
5963
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
6064
@Suspended AsyncResponse asyncResponse,
61-
@QueryParam("listenerName") String listenerName) {
65+
@QueryParam("listenerName") String listenerName,
66+
@HeaderParam(LISTENERNAME_HEADER) String listenerNameHeader) {
6267
TopicName topicName = getTopicName(topicDomain, property, cluster, namespace, encodedTopic);
68+
if (StringUtils.isEmpty(listenerName) && StringUtils.isNotEmpty(listenerNameHeader)) {
69+
listenerName = listenerNameHeader;
70+
}
6371
internalLookupTopicAsync(topicName, authoritative, asyncResponse, listenerName);
6472
}
6573

pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v2/TopicLookup.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,23 @@
2323
import javax.ws.rs.DefaultValue;
2424
import javax.ws.rs.Encoded;
2525
import javax.ws.rs.GET;
26+
import javax.ws.rs.HeaderParam;
2627
import javax.ws.rs.Path;
2728
import javax.ws.rs.PathParam;
2829
import javax.ws.rs.Produces;
2930
import javax.ws.rs.QueryParam;
3031
import javax.ws.rs.container.AsyncResponse;
3132
import javax.ws.rs.container.Suspended;
3233
import javax.ws.rs.core.MediaType;
34+
import org.apache.commons.lang3.StringUtils;
3335
import org.apache.pulsar.broker.lookup.TopicLookupBase;
3436
import org.apache.pulsar.common.naming.TopicName;
3537

3638
@Path("/v2/topic")
3739
public class TopicLookup extends TopicLookupBase {
3840

41+
static final String LISTENERNAME_HEADER = "X-Pulsar-ListenerName";
42+
3943
@GET
4044
@Path("{topic-domain}/{tenant}/{namespace}/{topic}")
4145
@Produces(MediaType.APPLICATION_JSON)
@@ -45,8 +49,12 @@ public void lookupTopicAsync(@PathParam("topic-domain") String topicDomain, @Pat
4549
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
4650
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
4751
@Suspended AsyncResponse asyncResponse,
48-
@QueryParam("listenerName") String listenerName) {
52+
@QueryParam("listenerName") String listenerName,
53+
@HeaderParam(LISTENERNAME_HEADER) String listenerNameHeader) {
4954
TopicName topicName = getTopicName(topicDomain, tenant, namespace, encodedTopic);
55+
if (StringUtils.isEmpty(listenerName) && StringUtils.isNotEmpty(listenerNameHeader)) {
56+
listenerName = listenerNameHeader;
57+
}
5058
internalLookupTopicAsync(topicName, authoritative, asyncResponse, listenerName);
5159
}
5260

pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public void crossColoLookup() throws Exception {
116116

117117
AsyncResponse asyncResponse = mock(AsyncResponse.class);
118118
destLookup.lookupTopicAsync(TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false,
119-
asyncResponse, null);
119+
asyncResponse, null, null);
120120

121121
ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
122122
verify(asyncResponse).resume(arg.capture());
@@ -146,7 +146,7 @@ public void testNotEnoughLookupPermits() throws Exception {
146146

147147
AsyncResponse asyncResponse1 = mock(AsyncResponse.class);
148148
destLookup.lookupTopicAsync(TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false,
149-
asyncResponse1, null);
149+
asyncResponse1, null, null);
150150

151151
ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
152152
verify(asyncResponse1).resume(arg.capture());
@@ -182,15 +182,15 @@ public void testValidateReplicationSettingsOnNamespace() throws Exception {
182182

183183
AsyncResponse asyncResponse = mock(AsyncResponse.class);
184184
destLookup.lookupTopicAsync(TopicDomain.persistent.value(), property, cluster, ns1, "empty-cluster",
185-
false, asyncResponse, null);
185+
false, asyncResponse, null, null);
186186

187187
ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
188188
verify(asyncResponse).resume(arg.capture());
189189
assertEquals(arg.getValue().getClass(), RestException.class);
190190

191191
AsyncResponse asyncResponse2 = mock(AsyncResponse.class);
192192
destLookup.lookupTopicAsync(TopicDomain.persistent.value(), property, cluster, ns2,
193-
"invalid-localCluster", false, asyncResponse2, null);
193+
"invalid-localCluster", false, asyncResponse2, null, null);
194194
ArgumentCaptor<Throwable> arg2 = ArgumentCaptor.forClass(Throwable.class);
195195
verify(asyncResponse2).resume(arg2.capture());
196196

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.lookup.http.v2;
20+
21+
import javax.ws.rs.container.AsyncResponse;
22+
import javax.ws.rs.core.Response;
23+
import org.apache.pulsar.broker.lookup.v2.TopicLookup;
24+
import org.apache.pulsar.broker.web.PulsarWebResourceTest;
25+
import org.apache.pulsar.common.lookup.data.LookupData;
26+
import org.apache.pulsar.common.naming.TopicName;
27+
import org.glassfish.jersey.server.ResourceConfig;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
import org.testng.annotations.AfterMethod;
31+
import org.testng.annotations.BeforeMethod;
32+
import org.testng.annotations.Test;
33+
34+
import static org.mockito.Mockito.spy;
35+
import static org.testng.Assert.assertEquals;
36+
37+
/**
38+
* TopicLookup V2 API unit tests.
39+
*/
40+
@Test(groups = "broker")
41+
public class TopicLookupTest extends PulsarWebResourceTest {
42+
43+
private static final String TOPIC_PATH = "/v2/topic/persistent/public/testns/testtopic";
44+
45+
private TestableTopicLookup resource;
46+
47+
@Override
48+
protected ResourceConfig configure() {
49+
resource = spy(new TestableTopicLookup());
50+
return new ResourceConfig().register(resource);
51+
}
52+
53+
@Test
54+
public void testListenerName() {
55+
Response response;
56+
// verify query param
57+
response = target(TOPIC_PATH).queryParam("listenerName", "query").request().get();
58+
assertEquals(response.getStatus(), 200);
59+
assertEquals(resource.actualListenerName, "query");
60+
61+
// verify header param
62+
response = target(TOPIC_PATH).request().header("X-Pulsar-ListenerName", "header").get();
63+
assertEquals(response.getStatus(), 200);
64+
assertEquals(resource.actualListenerName, "header");
65+
66+
// verify that query param supersedes the header param
67+
response = target(TOPIC_PATH).queryParam("listenerName", "query")
68+
.request().header("X-Pulsar-ListenerName", "header").get();
69+
assertEquals(response.getStatus(), 200);
70+
assertEquals(resource.actualListenerName, "query");
71+
}
72+
73+
private static class TestableTopicLookup extends TopicLookup {
74+
private String actualListenerName;
75+
76+
@Override
77+
protected void internalLookupTopicAsync(TopicName topicName, boolean authoritative, AsyncResponse asyncResponse,
78+
String listenerName) {
79+
this.actualListenerName = listenerName;
80+
asyncResponse.resume(new LookupData());
81+
}
82+
}
83+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.web;
20+
21+
import javax.servlet.ServletContext;
22+
import javax.ws.rs.core.Context;
23+
import javax.ws.rs.core.Feature;
24+
import javax.ws.rs.core.FeatureContext;
25+
import org.apache.pulsar.broker.PulsarService;
26+
import org.apache.pulsar.broker.ServiceConfiguration;
27+
import org.glassfish.jersey.server.ResourceConfig;
28+
import org.glassfish.jersey.servlet.ServletContainer;
29+
import org.glassfish.jersey.test.DeploymentContext;
30+
import org.glassfish.jersey.test.JerseyTestNg;
31+
import org.glassfish.jersey.test.ServletDeploymentContext;
32+
import org.glassfish.jersey.test.TestProperties;
33+
import org.glassfish.jersey.test.grizzly.GrizzlyWebTestContainerFactory;
34+
import org.glassfish.jersey.test.spi.TestContainerException;
35+
import org.glassfish.jersey.test.spi.TestContainerFactory;
36+
import org.testng.annotations.AfterClass;
37+
import org.testng.annotations.BeforeClass;
38+
39+
import static org.mockito.Mockito.doReturn;
40+
import static org.mockito.Mockito.mock;
41+
42+
/**
43+
* A base class for testing subclasses of {@link PulsarWebResource}.
44+
*/
45+
public abstract class PulsarWebResourceTest extends JerseyTestNg.ContainerPerClassTest {
46+
47+
protected ServiceConfiguration config;
48+
protected PulsarService pulsar;
49+
50+
protected PulsarWebResourceTest() {
51+
config = new ServiceConfiguration();
52+
53+
pulsar = mock(PulsarService.class);
54+
doReturn(config).when(pulsar).getConfig();
55+
doReturn(config).when(pulsar).getConfiguration();
56+
57+
set(TestProperties.CONTAINER_PORT, 0);
58+
}
59+
60+
@BeforeClass(alwaysRun = true)
61+
@Override
62+
public void setUp() throws Exception {
63+
super.setUp();
64+
}
65+
66+
@AfterClass(alwaysRun = true)
67+
@Override
68+
public void tearDown() throws Exception {
69+
super.tearDown();
70+
}
71+
72+
/**
73+
* Creates a JAX-RS resource configuration for test purposes.
74+
*/
75+
@Override
76+
protected abstract ResourceConfig configure();
77+
78+
/**
79+
* Creates a test container factory with servlet support.
80+
*/
81+
@Override
82+
protected TestContainerFactory getTestContainerFactory() throws TestContainerException {
83+
return new GrizzlyWebTestContainerFactory();
84+
}
85+
86+
/**
87+
* Configures a deployment context for JAX-RS.
88+
*/
89+
@Override
90+
protected DeploymentContext configureDeployment() {
91+
ResourceConfig app = configure();
92+
app.register(new Feature() {
93+
@Context
94+
ServletContext servletContext;
95+
96+
@Override
97+
public boolean configure(FeatureContext context) {
98+
servletContext.setAttribute(WebService.ATTRIBUTE_PULSAR_NAME, pulsar);
99+
return true;
100+
}
101+
});
102+
return ServletDeploymentContext.forServlet(new ServletContainer(app)).build();
103+
}
104+
}

0 commit comments

Comments
 (0)