From fab65f519f744b983bd6dfbb5b5e70dd91861266 Mon Sep 17 00:00:00 2001 From: liudezhi <33149602+liudezhi2098@users.noreply.github.com> Date: Sun, 16 Jan 2022 14:19:49 +0800 Subject: [PATCH 1/5] fix_topic_produced_through_rest_not_support_Authorization --- .../main/java/org/apache/pulsar/broker/rest/TopicsBase.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java index 1df2c14bc9c94..f89abf9bea30d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java @@ -756,7 +756,8 @@ && pulsar().getBrokerService().isAuthorizationEnabled()) { } boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService() - .canProduce(topicName, originalPrincipal(), clientAuthData()); + .canProduce(topicName, originalPrincipal() == null ? clientAppId() : originalPrincipal(), + clientAuthData()); if (!isAuthorized) { throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to produce to topic %s" + " with clientAppId [%s] and authdata %s", topicName.toString(), From c48e2a17b16d3db554235a254c2a0a48c0cb2ed6 Mon Sep 17 00:00:00 2001 From: liudezhi <33149602+liudezhi2098@users.noreply.github.com> Date: Mon, 17 Jan 2022 11:20:41 +0800 Subject: [PATCH 2/5] add test for this TopicsAuthTest --- .../pulsar/broker/admin/TopicsAuthTest.java | 244 ++++++++++++++++++ 1 file changed, 244 insertions(+) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java new file mode 100644 index 0000000000000..116e4d00a5c1b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Sets; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; +import java.util.Base64; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import javax.crypto.SecretKey; +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; +import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.broker.rest.Topics; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.auth.AuthenticationToken; +import org.apache.pulsar.client.impl.schema.StringSchema; +import org.apache.pulsar.common.policies.data.AuthAction; +import org.apache.pulsar.common.policies.data.ClusterDataImpl; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.websocket.data.ProducerMessage; +import org.apache.pulsar.websocket.data.ProducerMessages; +import org.glassfish.jersey.client.ClientConfig; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.media.multipart.MultiPartFeature; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.spy; + +public class TopicsAuthTest extends MockedPulsarServiceBaseTest { + + + private Topics topics; + private final String testLocalCluster = "test"; + private final String testTenant = "my-tenant"; + private final String testNamespace = "my-namespace"; + private final String testTopicName = "my-topic"; + + private static final SecretKey SECRET_KEY = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + private static final String ADMIN_TOKEN = Jwts.builder().setSubject("admin").signWith(SECRET_KEY).compact(); + private static final String PRODUCE_TOKEN = Jwts.builder().setSubject("producer").signWith(SECRET_KEY).compact(); + private static final String CONSUME_TOKEN = Jwts.builder().setSubject("consumer").signWith(SECRET_KEY).compact(); + + @Override + @BeforeMethod + protected void setup() throws Exception { + // enable auth&auth and use JWT at broker + conf.setAuthenticationEnabled(true); + conf.setAuthorizationEnabled(true); + conf.getProperties().setProperty("tokenSecretKey", "data:;base64," + + Base64.getEncoder().encodeToString(SECRET_KEY.getEncoded())); + Set superUserRoles = new HashSet<>(); + superUserRoles.add("admin"); + conf.setSuperUserRoles(superUserRoles); + Set providers = new HashSet<>(); + providers.add(AuthenticationProviderToken.class.getName()); + conf.setAuthenticationProviders(providers); + super.internalSetup(); + topics = spy(new Topics()); + topics.setPulsar(pulsar); + PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null + ? brokerUrl.toString() : brokerUrlTls.toString()) + .authentication(AuthenticationToken.class.getName(), + ADMIN_TOKEN); + admin = Mockito.spy(pulsarAdminBuilder.build()); + admin.clusters().createCluster(testLocalCluster, new ClusterDataImpl()); + admin.tenants().createTenant(testTenant, new TenantInfoImpl(Sets.newHashSet("role1", "role2"), + Sets.newHashSet(testLocalCluster))); + admin.namespaces().createNamespace(testTenant + "/" + testNamespace, + Sets.newHashSet(testLocalCluster)); + admin.namespaces().grantPermissionOnNamespace(testTenant + "/" + testNamespace, "producer", + EnumSet.of(AuthAction.produce)); + admin.namespaces().grantPermissionOnNamespace(testTenant + "/" + testNamespace, "consumer", + EnumSet.of(AuthAction.consume)); + } + + @Override + @AfterMethod + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @DataProvider(name = "variations") + public static Object[][] variations() { + return new Object[][]{ + {CONSUME_TOKEN, 401}, + {PRODUCE_TOKEN, 200} + }; + } + + @Test(dataProvider = "variations") + public void testProduceToNonPartitionedTopic(String token, int status) throws Exception { + admin.topics().createNonPartitionedTopic("persistent://" + testTenant + "/" + + testNamespace + "/" + testTopicName); + AsyncResponse asyncResponse = PowerMockito.mock(AsyncResponse.class); + Schema schema = StringSchema.utf8(); + ProducerMessages producerMessages = new ProducerMessages(); + producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal(). + writeValueAsString(schema.getSchemaInfo())); + producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal(). + writeValueAsString(schema.getSchemaInfo())); + String message = "[" + + "{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1}," + + "{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2}]"; + producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message, + new TypeReference>() { + })); + + WebTarget root = buildWebClient(); + Response response = root.path("/topics/persistent/" + testTenant + "/" + testNamespace + "/" + + testTopicName).request(MediaType.APPLICATION_JSON) + .header("Authorization", "Bearer " + token) + .post(Entity.json(producerMessages)); + Assert.assertEquals(response.getStatus(), status); + } + + @Test(dataProvider = "variations") + public void testProduceToPartitionedTopic(String token, int status) throws Exception { + admin.topics().createPartitionedTopic("persistent://" + testTenant + "/" + + testNamespace + "/" + testTopicName, 5); + AsyncResponse asyncResponse = mock(AsyncResponse.class); + Schema schema = StringSchema.utf8(); + ProducerMessages producerMessages = new ProducerMessages(); + producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal(). + writeValueAsString(schema.getSchemaInfo())); + producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal(). + writeValueAsString(schema.getSchemaInfo())); + String message = "[" + + "{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1}," + + "{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2}]"; + producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message, + new TypeReference>() { + })); + + WebTarget root = buildWebClient(); + Response response = root.path("/topics/persistent/" + testTenant + "/" + testNamespace + "/" + testTopicName + + "/partitions/2") + .request(MediaType.APPLICATION_JSON) + .header("Authorization", "Bearer " + token) + .post(Entity.json(producerMessages)); + Assert.assertEquals(response.getStatus(), status); + } + + @Test(dataProvider = "variations") + public void testProduceOnNonPersistentNonPartitionedTopic(String token, int status) throws Exception { + admin.topics().createNonPartitionedTopic("non-persistent://" + testTenant + "/" + + testNamespace + "/" + testTopicName); + AsyncResponse asyncResponse = PowerMockito.mock(AsyncResponse.class); + Schema schema = StringSchema.utf8(); + ProducerMessages producerMessages = new ProducerMessages(); + producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal(). + writeValueAsString(schema.getSchemaInfo())); + producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal(). + writeValueAsString(schema.getSchemaInfo())); + String message = "[" + + "{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1}," + + "{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2}]"; + producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message, + new TypeReference>() { + })); + + WebTarget root = buildWebClient(); + Response response = root.path("/topics/non-persistent/" + testTenant + "/" + testNamespace + "/" + + testTopicName).request(MediaType.APPLICATION_JSON) + .header("Authorization", "Bearer " + token) + .post(Entity.json(producerMessages)); + Assert.assertEquals(response.getStatus(), status); + } + + @Test(dataProvider = "variations") + public void testProduceOnNonPersistentPartitionedTopic(String token, int status) throws Exception { + admin.topics().createPartitionedTopic("non-persistent://" + testTenant + "/" + + testNamespace + "/" + testTopicName, 5); + AsyncResponse asyncResponse = mock(AsyncResponse.class); + Schema schema = StringSchema.utf8(); + ProducerMessages producerMessages = new ProducerMessages(); + producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal(). + writeValueAsString(schema.getSchemaInfo())); + producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal(). + writeValueAsString(schema.getSchemaInfo())); + String message = "[" + + "{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1}," + + "{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2}]"; + producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message, + new TypeReference>() { + })); + + WebTarget root = buildWebClient(); + Response response = root.path("/topics/non-persistent/" + testTenant + "/" + testNamespace + "/" + testTopicName + + "/partitions/2") + .request(MediaType.APPLICATION_JSON) + .header("Authorization", "Bearer " + token) + .post(Entity.json(producerMessages)); + Assert.assertEquals(response.getStatus(), status); + } + + WebTarget buildWebClient() throws Exception { + ClientConfig httpConfig = new ClientConfig(); + httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true); + httpConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8); + httpConfig.register(MultiPartFeature.class); + + javax.ws.rs.client.ClientBuilder clientBuilder = ClientBuilder.newBuilder().withConfig(httpConfig); + Client client = clientBuilder.build(); + return client.target(brokerUrl.toString()); + } + +} From 72e7f442228194fa1208becd6f9b4a0c65e8a669 Mon Sep 17 00:00:00 2001 From: liudezhi <33149602+liudezhi2098@users.noreply.github.com> Date: Thu, 20 Jan 2022 10:26:02 +0800 Subject: [PATCH 3/5] Remove unneeded code --- .../java/org/apache/pulsar/broker/admin/TopicsAuthTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java index 116e4d00a5c1b..7e7d4f9c17fab 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java @@ -128,7 +128,6 @@ public static Object[][] variations() { public void testProduceToNonPartitionedTopic(String token, int status) throws Exception { admin.topics().createNonPartitionedTopic("persistent://" + testTenant + "/" + testNamespace + "/" + testTopicName); - AsyncResponse asyncResponse = PowerMockito.mock(AsyncResponse.class); Schema schema = StringSchema.utf8(); ProducerMessages producerMessages = new ProducerMessages(); producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal(). @@ -154,7 +153,6 @@ public void testProduceToNonPartitionedTopic(String token, int status) throws Ex public void testProduceToPartitionedTopic(String token, int status) throws Exception { admin.topics().createPartitionedTopic("persistent://" + testTenant + "/" + testNamespace + "/" + testTopicName, 5); - AsyncResponse asyncResponse = mock(AsyncResponse.class); Schema schema = StringSchema.utf8(); ProducerMessages producerMessages = new ProducerMessages(); producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal(). @@ -181,7 +179,6 @@ public void testProduceToPartitionedTopic(String token, int status) throws Excep public void testProduceOnNonPersistentNonPartitionedTopic(String token, int status) throws Exception { admin.topics().createNonPartitionedTopic("non-persistent://" + testTenant + "/" + testNamespace + "/" + testTopicName); - AsyncResponse asyncResponse = PowerMockito.mock(AsyncResponse.class); Schema schema = StringSchema.utf8(); ProducerMessages producerMessages = new ProducerMessages(); producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal(). @@ -207,7 +204,6 @@ public void testProduceOnNonPersistentNonPartitionedTopic(String token, int stat public void testProduceOnNonPersistentPartitionedTopic(String token, int status) throws Exception { admin.topics().createPartitionedTopic("non-persistent://" + testTenant + "/" + testNamespace + "/" + testTopicName, 5); - AsyncResponse asyncResponse = mock(AsyncResponse.class); Schema schema = StringSchema.utf8(); ProducerMessages producerMessages = new ProducerMessages(); producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal(). From 1cd968a278235737857db6fc416c732ffb4f7f09 Mon Sep 17 00:00:00 2001 From: liudezhi <33149602+liudezhi2098@users.noreply.github.com> Date: Thu, 20 Jan 2022 10:29:16 +0800 Subject: [PATCH 4/5] Remove unneeded code --- .../java/org/apache/pulsar/broker/admin/TopicsAuthTest.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java index 7e7d4f9c17fab..683a661a47690 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java @@ -65,8 +65,6 @@ public class TopicsAuthTest extends MockedPulsarServiceBaseTest { - - private Topics topics; private final String testLocalCluster = "test"; private final String testTenant = "my-tenant"; private final String testNamespace = "my-namespace"; @@ -92,8 +90,6 @@ protected void setup() throws Exception { providers.add(AuthenticationProviderToken.class.getName()); conf.setAuthenticationProviders(providers); super.internalSetup(); - topics = spy(new Topics()); - topics.setPulsar(pulsar); PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder().serviceHttpUrl(brokerUrl != null ? brokerUrl.toString() : brokerUrlTls.toString()) .authentication(AuthenticationToken.class.getName(), From 8cfbafde18422e4ed6f6ce91061bfdd9e99435ed Mon Sep 17 00:00:00 2001 From: liudezhi <33149602+liudezhi2098@users.noreply.github.com> Date: Thu, 20 Jan 2022 11:21:06 +0800 Subject: [PATCH 5/5] reuse the same code block --- .../pulsar/broker/admin/TopicsAuthTest.java | 103 +++++------------- 1 file changed, 30 insertions(+), 73 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java index 683a661a47690..185053ed7029b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsAuthTest.java @@ -32,13 +32,11 @@ import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; -import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationProviderToken; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; -import org.apache.pulsar.broker.rest.Topics; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.Schema; @@ -54,14 +52,11 @@ import org.glassfish.jersey.client.ClientProperties; import org.glassfish.jersey.media.multipart.MultiPartFeature; import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import static org.powermock.api.mockito.PowerMockito.mock; -import static org.powermock.api.mockito.PowerMockito.spy; public class TopicsAuthTest extends MockedPulsarServiceBaseTest { @@ -122,84 +117,39 @@ public static Object[][] variations() { @Test(dataProvider = "variations") public void testProduceToNonPartitionedTopic(String token, int status) throws Exception { - admin.topics().createNonPartitionedTopic("persistent://" + testTenant + "/" - + testNamespace + "/" + testTopicName); - Schema schema = StringSchema.utf8(); - ProducerMessages producerMessages = new ProducerMessages(); - producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal(). - writeValueAsString(schema.getSchemaInfo())); - producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal(). - writeValueAsString(schema.getSchemaInfo())); - String message = "[" + - "{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1}," + - "{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2}]"; - producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message, - new TypeReference>() { - })); - - WebTarget root = buildWebClient(); - Response response = root.path("/topics/persistent/" + testTenant + "/" + testNamespace + "/" - + testTopicName).request(MediaType.APPLICATION_JSON) - .header("Authorization", "Bearer " + token) - .post(Entity.json(producerMessages)); - Assert.assertEquals(response.getStatus(), status); + innerTestProduce(testTopicName, true, false, token, status); } @Test(dataProvider = "variations") public void testProduceToPartitionedTopic(String token, int status) throws Exception { - admin.topics().createPartitionedTopic("persistent://" + testTenant + "/" - + testNamespace + "/" + testTopicName, 5); - Schema schema = StringSchema.utf8(); - ProducerMessages producerMessages = new ProducerMessages(); - producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal(). - writeValueAsString(schema.getSchemaInfo())); - producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal(). - writeValueAsString(schema.getSchemaInfo())); - String message = "[" + - "{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1}," + - "{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2}]"; - producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message, - new TypeReference>() { - })); - - WebTarget root = buildWebClient(); - Response response = root.path("/topics/persistent/" + testTenant + "/" + testNamespace + "/" + testTopicName - + "/partitions/2") - .request(MediaType.APPLICATION_JSON) - .header("Authorization", "Bearer " + token) - .post(Entity.json(producerMessages)); - Assert.assertEquals(response.getStatus(), status); + innerTestProduce(testTopicName, true, true, token, status); } @Test(dataProvider = "variations") public void testProduceOnNonPersistentNonPartitionedTopic(String token, int status) throws Exception { - admin.topics().createNonPartitionedTopic("non-persistent://" + testTenant + "/" - + testNamespace + "/" + testTopicName); - Schema schema = StringSchema.utf8(); - ProducerMessages producerMessages = new ProducerMessages(); - producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal(). - writeValueAsString(schema.getSchemaInfo())); - producerMessages.setValueSchema(ObjectMapperFactory.getThreadLocal(). - writeValueAsString(schema.getSchemaInfo())); - String message = "[" + - "{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1}," + - "{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2}]"; - producerMessages.setMessages(ObjectMapperFactory.getThreadLocal().readValue(message, - new TypeReference>() { - })); - - WebTarget root = buildWebClient(); - Response response = root.path("/topics/non-persistent/" + testTenant + "/" + testNamespace + "/" - + testTopicName).request(MediaType.APPLICATION_JSON) - .header("Authorization", "Bearer " + token) - .post(Entity.json(producerMessages)); - Assert.assertEquals(response.getStatus(), status); + innerTestProduce(testTopicName, false, false, token, status); } @Test(dataProvider = "variations") public void testProduceOnNonPersistentPartitionedTopic(String token, int status) throws Exception { - admin.topics().createPartitionedTopic("non-persistent://" + testTenant + "/" - + testNamespace + "/" + testTopicName, 5); + innerTestProduce(testTopicName, false, true, token, status); + } + + private void innerTestProduce(String createTopicName, boolean isPersistent, boolean isPartition, + String token, int status) throws Exception { + String topicPrefix = null; + if (isPersistent == true) { + topicPrefix = "persistent"; + } else { + topicPrefix = "non-persistent"; + } + if (isPartition == true) { + admin.topics().createPartitionedTopic(topicPrefix + "://" + testTenant + "/" + + testNamespace + "/" + createTopicName, 5); + } else { + admin.topics().createNonPartitionedTopic(topicPrefix + "://" + testTenant + "/" + + testNamespace + "/" + createTopicName); + } Schema schema = StringSchema.utf8(); ProducerMessages producerMessages = new ProducerMessages(); producerMessages.setKeySchema(ObjectMapperFactory.getThreadLocal(). @@ -214,8 +164,15 @@ public void testProduceOnNonPersistentPartitionedTopic(String token, int status) })); WebTarget root = buildWebClient(); - Response response = root.path("/topics/non-persistent/" + testTenant + "/" + testNamespace + "/" + testTopicName - + "/partitions/2") + String requestPath = null; + if (isPartition == true) { + requestPath = "/topics/" + topicPrefix + "/" + testTenant + "/" + testNamespace + "/" + + createTopicName + "/partitions/2"; + } else { + requestPath = "/topics/" + topicPrefix + "/" + testTenant + "/" + testNamespace + "/" + createTopicName; + } + + Response response = root.path(requestPath) .request(MediaType.APPLICATION_JSON) .header("Authorization", "Bearer " + token) .post(Entity.json(producerMessages));