Skip to content

Commit

Permalink
fix: set retention period when create topic and add test case for top…
Browse files Browse the repository at this point in the history
…ic creation

Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
  • Loading branch information
lizhanhui committed Nov 2, 2023
1 parent 5c2da57 commit 8108036
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
import com.automq.rocketmq.common.exception.RocketMQException;

public class ControllerException extends RocketMQException {
public ControllerException(int errorCode) {
super(errorCode);
}

public ControllerException(int errorCode, String message) {
super(errorCode, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ public boolean hasAliveBrokerNodes() {
public String leaderAddress() throws ControllerException {
if (null == lease || lease.expired()) {
LOGGER.error("No lease is populated yet or lease was expired");
throw new ControllerException(Code.NO_LEADER_VALUE);
throw new ControllerException(Code.NO_LEADER_VALUE, "No leader node is available");
}

BrokerNode brokerNode = nodes.get(this.lease.getNodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ public CompletableFuture<Long> createTopic(CreateTopicRequest request) {
if (!metadataStore.maintainLeadershipWithSharedLock(session)) {
continue;
}

TopicMapper topicMapper = session.getMapper(TopicMapper.class);
if (null != topicMapper.get(null, request.getTopic())) {
ControllerException e = new ControllerException(Code.DUPLICATED_VALUE,
Expand All @@ -146,6 +145,7 @@ public CompletableFuture<Long> createTopic(CreateTopicRequest request) {
topic.setQueueNum(request.getCount());
topic.setStatus(TopicStatus.TOPIC_STATUS_ACTIVE);
topic.setAcceptMessageTypes(JsonFormat.printer().print(request.getAcceptTypes()));
topic.setRetentionHours(request.getRetentionHours());
topicMapper.create(topic);
long topicId = topic.getId();
List<QueueAssignment> assignments = createQueues(IntStream.range(0, request.getCount()),
Expand All @@ -160,6 +160,7 @@ public CompletableFuture<Long> createTopic(CreateTopicRequest request) {
} catch (ControllerException | InvalidProtocolBufferException e) {
future.completeExceptionally(e);
}
return future;
} else {
try {
metadataStore.controllerClient().createTopic(metadataStore.leaderAddress(), request).whenComplete((res, e) -> {
Expand Down Expand Up @@ -468,7 +469,7 @@ private List<QueueAssignment> createQueues(IntStream range, long topicId,
.toList();
if (aliveNodes.isEmpty()) {
LOGGER.warn("0 of {} broker nodes is alive", metadataStore.allNodes().size());
throw new ControllerException(Code.NODE_UNAVAILABLE_VALUE);
throw new ControllerException(Code.NODE_UNAVAILABLE_VALUE, "No broker node is available");
}

List<QueueAssignment> assignments = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import apache.rocketmq.controller.v1.ControllerServiceGrpc;
import apache.rocketmq.controller.v1.CreateGroupReply;
import apache.rocketmq.controller.v1.CreateGroupRequest;
import apache.rocketmq.controller.v1.CreateTopicReply;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.DeleteTopicReply;
import apache.rocketmq.controller.v1.DeleteTopicRequest;
Expand Down Expand Up @@ -215,6 +216,51 @@ public void testHeartbeatGrpc() throws IOException {
}
}

@Test
public void testCreateTopic() throws IOException {
AcceptTypes acceptTypes = AcceptTypes.newBuilder()
.addTypes(MessageType.NORMAL)
.addTypes(MessageType.DELAY)
.build();
ControllerClient controllerClient = Mockito.mock(ControllerClient.class);
try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(controllerClient, getSessionFactory(), config)) {
metadataStore.start();
Node node = new Node();
node.setId(config.nodeId());
node.setName(config.name());
metadataStore.addBrokerNode(node);
Awaitility.await().with().pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(10, TimeUnit.SECONDS)
.until(metadataStore::isLeader);
try (ControllerTestServer testServer = new ControllerTestServer(0, new ControllerServiceImpl(metadataStore))) {
testServer.start();
int port = testServer.getPort();
ManagedChannel channel = Grpc.newChannelBuilderForAddress("localhost", port, InsecureChannelCredentials.create()).build();
ControllerServiceGrpc.ControllerServiceBlockingStub blockingStub = ControllerServiceGrpc.newBlockingStub(channel);
CreateTopicReply reply = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
.createTopic(CreateTopicRequest.newBuilder()
.setTopic("T1")
.setRetentionHours(1)
.setAcceptTypes(acceptTypes)
.setCount(4)
.build());

Assertions.assertEquals(Code.OK, reply.getStatus().getCode());
try (SqlSession session = getSessionFactory().openSession()) {
TopicMapper topicMapper = session.getMapper(TopicMapper.class);
Topic topic = topicMapper.get(reply.getTopicId(), null);
Assertions.assertEquals("T1", topic.getName());
Assertions.assertEquals(1, topic.getRetentionHours());
Assertions.assertEquals(4, topic.getQueueNum());
AcceptTypes.Builder builder = AcceptTypes.newBuilder();
JsonFormat.parser().merge(topic.getAcceptMessageTypes(), builder);
Assertions.assertEquals(acceptTypes, builder.build());
}
channel.shutdownNow();
}
}
}

@Test
public void testListTopics() throws IOException {
AcceptTypes acceptTypes = AcceptTypes.newBuilder()
Expand Down Expand Up @@ -962,6 +1008,7 @@ public void testListOpenStreams() throws IOException, ExecutionException, Interr
}
}
}

@Test
public void testCreateTopic_OpenStream_CloseStream() throws IOException, ExecutionException, InterruptedException, ControllerException {
ControllerClient controllerClient = Mockito.mock(ControllerClient.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public void testCreateTopic() throws ControllerException, IOException, Execution
CreateTopicRequest request = CreateTopicRequest.newBuilder()
.setTopic(topicName)
.setCount(queueNum)
.setRetentionHours(1)
.setAcceptTypes(AcceptTypes.newBuilder()
.addTypes(MessageType.NORMAL)
.addTypes(MessageType.FIFO)
Expand Down

0 comments on commit 8108036

Please sign in to comment.