Skip to content

Commit

Permalink
add access test for two client
Browse files Browse the repository at this point in the history
Signed-off-by: lyx <1419360299@qq.com>
  • Loading branch information
lyx2000 committed Jul 9, 2023
1 parent 6bbea49 commit 02ed4c7
Show file tree
Hide file tree
Showing 5 changed files with 366 additions and 0 deletions.
7 changes: 7 additions & 0 deletions acl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,12 @@
<artifactId>spring-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>${rocketmq-client-java.version}</version>
<scope>test</scope>
</dependency>

</dependencies>
</project>
156 changes: 156 additions & 0 deletions acl/src/test/java/org/apache/rocketmq/acl/GrpcClientAccessTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl;

import apache.rocketmq.v2.Message;
import apache.rocketmq.v2.MessageQueue;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.Resource;
import apache.rocketmq.v2.SendMessageRequest;
import java.io.File;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.AuthenticationHeader;
import org.apache.rocketmq.acl.plain.AclTestHelper;
import org.apache.rocketmq.acl.plain.PlainAccessResource;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.SessionCredentialsProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.java.misc.ClientId;
import org.apache.rocketmq.client.java.rpc.Signature;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.shaded.io.grpc.Metadata;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class GrpcClientAccessTest {

private PlainAccessValidator plainAccessValidator;

private File confHome;

AuthenticationHeader authenticationHeader;

@Before
public void init() throws IOException, NoSuchAlgorithmException, InvalidKeyException {
String folder = "conf";
confHome = AclTestHelper.copyResources(folder, true);
System.setProperty("rocketmq.home.dir", confHome.getAbsolutePath());
plainAccessValidator = new PlainAccessValidator();

String accessKey = "rocketmq3";
String secretKey = "12345678";
String endpoint = "127.0.0.1:8081";
String clientAddress = "10.7.1.3";
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setCredentialProvider(sessionCredentialsProvider)
.setEndpoints(endpoint)
.build();
Metadata metadata = Signature.sign(clientConfiguration, new ClientId());

authenticationHeader = AuthenticationHeader.builder()
.authorization(metadata.get(Metadata.Key.of(Signature.AUTHORIZATION_KEY, Metadata.ASCII_STRING_MARSHALLER)))
.datetime(metadata.get(Metadata.Key.of(Signature.DATE_TIME_KEY, Metadata.ASCII_STRING_MARSHALLER)))
.remoteAddress(clientAddress)
.build();
}

@After
public void cleanUp() {
AclTestHelper.recursiveDelete(confHome);
}

@Test(expected = AclException.class)
public void testProduceDenyTopic() {
authenticationHeader.setRequestCode(RequestCode.SEND_MESSAGE_V2);

PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(
getMockSendMessageRequest("topicD"), authenticationHeader);
plainAccessValidator.validate(accessResource);

}

@Test
public void testProduceAuthorizedTopic() {
authenticationHeader.setRequestCode(RequestCode.SEND_MESSAGE_V2);

PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(
getMockSendMessageRequest("topicA"), authenticationHeader);
plainAccessValidator.validate(accessResource);
}

private SendMessageRequest getMockSendMessageRequest(String topic) {
return SendMessageRequest.newBuilder()
.addMessages(Message.newBuilder()
.setTopic(Resource.newBuilder()
.setName(topic)))
.build();
}

@Test(expected = AclException.class)
public void testConsumeDenyTopic() {
authenticationHeader.setRequestCode(RequestCode.PULL_MESSAGE);

PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(
getMockReceiveMessageRequest("topicD", "groupB"), authenticationHeader);
plainAccessValidator.validate(accessResource);

}

@Test
public void testConsumeAuthorizedTopic() {
authenticationHeader.setRequestCode(RequestCode.PULL_MESSAGE);

PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(
getMockReceiveMessageRequest("topicB", "groupB"), authenticationHeader);
plainAccessValidator.validate(accessResource);
}

@Test(expected = AclException.class)
public void testConsumeInDeniedGroup() {
authenticationHeader.setRequestCode(RequestCode.PULL_MESSAGE);

PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(
getMockReceiveMessageRequest("topicB", "groupD"), authenticationHeader);
plainAccessValidator.validate(accessResource);
}

@Test
public void testConsumeInAuthorizedGroup() {
authenticationHeader.setRequestCode(RequestCode.PULL_MESSAGE);

PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(
getMockReceiveMessageRequest("topicB", "groupB"), authenticationHeader);
plainAccessValidator.validate(accessResource);
}

ReceiveMessageRequest getMockReceiveMessageRequest(String topic, String group) {
return ReceiveMessageRequest.newBuilder()
.setGroup(Resource.newBuilder()
.setName(group))
.setMessageQueue(MessageQueue.newBuilder().setTopic(Resource.newBuilder()
.setName(topic)))
.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.acl;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.acl.plain.AclTestHelper;
import org.apache.rocketmq.acl.plain.PlainAccessResource;
import org.apache.rocketmq.acl.plain.PlainAccessValidator;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class RemotingClientAccessTest {

private PlainAccessValidator plainAccessValidator;
private AclClientRPCHook aclClient;
private SessionCredentials sessionCredentials;

private File confHome;

private String clientAddress = "10.7.1.3";

@Before
public void init() throws IOException {
String folder = "conf";
confHome = AclTestHelper.copyResources(folder, true);
System.setProperty("rocketmq.home.dir", confHome.getAbsolutePath());
plainAccessValidator = new PlainAccessValidator();
sessionCredentials = new SessionCredentials();
sessionCredentials.setAccessKey("rocketmq3");
sessionCredentials.setSecretKey("12345678");
aclClient = new AclClientRPCHook(sessionCredentials);
}

@After
public void cleanUp() {
AclTestHelper.recursiveDelete(confHome);
}

@Test(expected = AclException.class)
public void testProduceDenyTopic() {
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicD");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClient.doBeforeRequest(clientAddress, remotingCommand);

ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
try {
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), clientAddress);
plainAccessValidator.validate(accessResource);
} catch (RemotingCommandException e) {
e.printStackTrace();
Assert.fail("Should not throw IOException");
}
}

@Test
public void testProduceAuthorizedTopic() {
SendMessageRequestHeader messageRequestHeader = new SendMessageRequestHeader();
messageRequestHeader.setTopic("topicA");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, messageRequestHeader);
aclClient.doBeforeRequest(clientAddress, remotingCommand);

ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
try {
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), clientAddress);
plainAccessValidator.validate(accessResource);
} catch (RemotingCommandException e) {
e.printStackTrace();
Assert.fail("Should not throw IOException");
}
}


@Test(expected = AclException.class)
public void testConsumeDenyTopic() {
PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
pullMessageRequestHeader.setTopic("topicD");
pullMessageRequestHeader.setConsumerGroup("groupB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
try {
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
plainAccessValidator.validate(accessResource);
} catch (RemotingCommandException e) {
e.printStackTrace();
Assert.fail("Should not throw IOException");
}

}

@Test
public void testConsumeAuthorizedTopic() {
PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
pullMessageRequestHeader.setTopic("topicB");
pullMessageRequestHeader.setConsumerGroup("groupB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
try {
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
plainAccessValidator.validate(accessResource);
} catch (RemotingCommandException e) {
e.printStackTrace();
Assert.fail("Should not throw IOException");
}
}

@Test(expected = AclException.class)
public void testConsumeInDeniedGroup() {
PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
pullMessageRequestHeader.setTopic("topicB");
pullMessageRequestHeader.setConsumerGroup("groupD");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
try {
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
plainAccessValidator.validate(accessResource);
} catch (RemotingCommandException e) {
e.printStackTrace();
Assert.fail("Should not throw IOException");
}
}

@Test
public void testConsumeInAuthorizedGroup() {
PullMessageRequestHeader pullMessageRequestHeader = new PullMessageRequestHeader();
pullMessageRequestHeader.setTopic("topicB");
pullMessageRequestHeader.setConsumerGroup("groupB");
RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, pullMessageRequestHeader);
aclClient.doBeforeRequest("", remotingCommand);
ByteBuffer buf = remotingCommand.encodeHeader();
buf.getInt();
buf = ByteBuffer.allocate(buf.limit() - buf.position()).put(buf);
buf.position(0);
try {
PlainAccessResource accessResource = (PlainAccessResource) plainAccessValidator.parse(RemotingCommand.decode(buf), "123.4.5.6");
plainAccessValidator.validate(accessResource);
} catch (RemotingCommandException e) {
e.printStackTrace();
Assert.fail("Should not throw IOException");
}
}

}
15 changes: 15 additions & 0 deletions acl/src/test/resources/conf/acl/plain_acl.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,18 @@ accounts:
# if it is admin, it could access all resources
admin: true

- accessKey: rocketmq3
secretKey: 12345678
admin: false
defaultTopicPerm: DENY
defaultGroupPerm: DENY
topicPerms:
- topicA=PUB
- topicB=SUB
- topicC=PUB|SUB
- topicD=DENY
groupPerms:
- groupB=SUB
- groupC=PUB|SUB
- groupD=DENY

1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
<awaitility.version>4.1.0</awaitility.version>
<truth.version>0.30</truth.version>
<s3mock-junit4.version>2.11.0</s3mock-junit4.version>
<rocketmq-client-java.version>5.0.5</rocketmq-client-java.version>

<!-- Build plugin dependencies -->
<versions-maven-plugin.version>2.2</versions-maven-plugin.version>
Expand Down

0 comments on commit 02ed4c7

Please sign in to comment.