Skip to content

Commit

Permalink
change tenant name to make tests more clear (apache#54)
Browse files Browse the repository at this point in the history
Tries to Provided a Tenant name in tests to make the tests more clear.
  • Loading branch information
jiazhai authored and sijie committed Dec 26, 2019
1 parent a96d0a4 commit 2436cf1
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/streamnative/kop/KafkaRequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ protected CompletableFuture<ResponseAndRequest> handleSaslAuthenticate(KafkaHead
Map<String, Set<AuthAction>> permissions = getAdmin()
.namespaces().getPermissions(saslAuth.getUsername());
if (!permissions.containsKey(authRole)) {
throw new AuthorizationException("Not allowed on this namespace");
throw new AuthorizationException("Role: " + authRole + " Not allowed on this namespace");
}

log.debug("successfully authenticate user " + authRole);
Expand Down
21 changes: 12 additions & 9 deletions src/test/java/io/streamnative/kop/SaslPlainTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Properties;
import javax.crypto.SecretKey;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -46,14 +47,16 @@
* Testing the SASL-PLAIN features on KoP.
*/
@Test
@Slf4j
public class SaslPlainTest extends MockKafkaServiceBaseTest {

private static final String SIMPLE_USER = "muggle_user";
private static final String TENANT = "testTenant";
private static final String ANOTHER_USER = "death_eater_user";
private static final String ADMIN_USER = "admin_user";
private static final String NAMESPACE = "ns1";
private static final String KAFKA_TOPIC = "topic1";
private static final String PULSAR_TOPIC_NAME = "persistent://" + SIMPLE_USER
private static final String PULSAR_TOPIC_NAME = "persistent://" + TENANT
+ "/" + NAMESPACE + "/" + KAFKA_TOPIC;
private static final String CLUSTER_NAME = "c1";
private String adminToken;
Expand Down Expand Up @@ -99,12 +102,12 @@ protected void setup() throws Exception {
admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(AuthenticationToken.class.getName(), "token:" + adminToken).build());

getAdmin().tenants().createTenant(SIMPLE_USER,
getAdmin().tenants().createTenant(TENANT,
new TenantInfo(Sets.newHashSet(ADMIN_USER), Sets.newHashSet(CLUSTER_NAME)));
getAdmin().namespaces().createNamespace(SIMPLE_USER + "/" + NAMESPACE);
getAdmin().namespaces().createNamespace(TENANT + "/" + NAMESPACE);
getAdmin().topics().createPartitionedTopic(PULSAR_TOPIC_NAME, 1);
getAdmin()
.namespaces().grantPermissionOnNamespace(SIMPLE_USER + "/" + NAMESPACE, SIMPLE_USER,
.namespaces().grantPermissionOnNamespace(TENANT + "/" + NAMESPACE, SIMPLE_USER,
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
}

Expand All @@ -115,7 +118,7 @@ protected void cleanup() throws Exception {
@Test(timeOut = 40000)
void simpleProduceAndConsume() throws Exception {
KProducer kProducer = new KProducer(KAFKA_TOPIC, false, "localhost", getKafkaBrokerPort(),
SIMPLE_USER + "/" + NAMESPACE, "token:" + userToken);
TENANT + "/" + NAMESPACE, "token:" + userToken);
int totalMsgs = 10;
String messageStrPrefix = PULSAR_TOPIC_NAME + "_message_";

Expand All @@ -125,7 +128,7 @@ void simpleProduceAndConsume() throws Exception {
}

KConsumer kConsumer = new KConsumer(KAFKA_TOPIC, "localhost", getKafkaBrokerPort(), false,
SIMPLE_USER + "/" + NAMESPACE, "token:" + userToken, "DemoKafkaOnPulsarConsumer");
TENANT + "/" + NAMESPACE, "token:" + userToken, "DemoKafkaOnPulsarConsumer");
kConsumer.getConsumer().subscribe(Collections.singleton(KAFKA_TOPIC));

int i = 0;
Expand Down Expand Up @@ -156,7 +159,7 @@ void badCredentialFail() throws Exception {
try {
@Cleanup
KProducer kProducer = new KProducer(KAFKA_TOPIC, false, "localhost", getKafkaBrokerPort(),
SIMPLE_USER + "/" + NAMESPACE, "token:dsa");
TENANT + "/" + NAMESPACE, "token:dsa");
kProducer.getProducer().send(new ProducerRecord<>(KAFKA_TOPIC, 0, "")).get();
fail("should have failed");
} catch (Exception e) {
Expand All @@ -169,7 +172,7 @@ void badUserFail() throws Exception {
try {
@Cleanup
KProducer kProducer = new KProducer(KAFKA_TOPIC, false, "localhost", getKafkaBrokerPort(),
SIMPLE_USER + "/" + NAMESPACE, "token:" + anotherToken);
TENANT + "/" + NAMESPACE, "token:" + anotherToken);
kProducer.getProducer().send(new ProducerRecord<>(KAFKA_TOPIC, 0, "")).get();
fail("should have failed");
} catch (Exception e) {
Expand All @@ -181,7 +184,7 @@ void badUserFail() throws Exception {
void badNamespaceProvided() throws Exception {
try {
KProducer kProducer = new KProducer(KAFKA_TOPIC, false, "localhost", getKafkaBrokerPort(),
SIMPLE_USER + "/ns2", "token:" + userToken);
TENANT + "/ns2", "token:" + userToken);
kProducer.getProducer().send(new ProducerRecord<>(KAFKA_TOPIC, 0, "")).get();
fail("should have failed");
} catch (Exception e) {
Expand Down

0 comments on commit 2436cf1

Please sign in to comment.