From 8108036ce5b826f35f0be3ae1b9addb90c9ce0ca Mon Sep 17 00:00:00 2001 From: Li Zhanhui Date: Thu, 2 Nov 2023 18:26:42 +0800 Subject: [PATCH] fix: set retention period when create topic and add test case for topic creation Signed-off-by: Li Zhanhui --- .../exception/ControllerException.java | 4 -- .../server/store/DefaultMetadataStore.java | 2 +- .../server/store/impl/TopicManager.java | 5 +- .../controller/ControllerServiceImplTest.java | 47 +++++++++++++++++++ .../controller/GrpcControllerClientTest.java | 1 + 5 files changed, 52 insertions(+), 7 deletions(-) diff --git a/controller/src/main/java/com/automq/rocketmq/controller/exception/ControllerException.java b/controller/src/main/java/com/automq/rocketmq/controller/exception/ControllerException.java index 967aa6d08..2d5fa4116 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/exception/ControllerException.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/exception/ControllerException.java @@ -20,10 +20,6 @@ import com.automq.rocketmq.common.exception.RocketMQException; public class ControllerException extends RocketMQException { - public ControllerException(int errorCode) { - super(errorCode); - } - public ControllerException(int errorCode, String message) { super(errorCode, message); } diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java b/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java index cf06f1a1d..24dc4e714 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java @@ -392,7 +392,7 @@ public boolean hasAliveBrokerNodes() { public String leaderAddress() throws ControllerException { if (null == lease || lease.expired()) { LOGGER.error("No lease is populated yet or lease was expired"); - throw new ControllerException(Code.NO_LEADER_VALUE); + throw new ControllerException(Code.NO_LEADER_VALUE, "No leader node is available"); } BrokerNode brokerNode = nodes.get(this.lease.getNodeId()); diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/TopicManager.java b/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/TopicManager.java index 89dea5a8f..0577f694e 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/TopicManager.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/store/impl/TopicManager.java @@ -132,7 +132,6 @@ public CompletableFuture createTopic(CreateTopicRequest request) { if (!metadataStore.maintainLeadershipWithSharedLock(session)) { continue; } - TopicMapper topicMapper = session.getMapper(TopicMapper.class); if (null != topicMapper.get(null, request.getTopic())) { ControllerException e = new ControllerException(Code.DUPLICATED_VALUE, @@ -146,6 +145,7 @@ public CompletableFuture createTopic(CreateTopicRequest request) { topic.setQueueNum(request.getCount()); topic.setStatus(TopicStatus.TOPIC_STATUS_ACTIVE); topic.setAcceptMessageTypes(JsonFormat.printer().print(request.getAcceptTypes())); + topic.setRetentionHours(request.getRetentionHours()); topicMapper.create(topic); long topicId = topic.getId(); List assignments = createQueues(IntStream.range(0, request.getCount()), @@ -160,6 +160,7 @@ public CompletableFuture createTopic(CreateTopicRequest request) { } catch (ControllerException | InvalidProtocolBufferException e) { future.completeExceptionally(e); } + return future; } else { try { metadataStore.controllerClient().createTopic(metadataStore.leaderAddress(), request).whenComplete((res, e) -> { @@ -468,7 +469,7 @@ private List createQueues(IntStream range, long topicId, .toList(); if (aliveNodes.isEmpty()) { LOGGER.warn("0 of {} broker nodes is alive", metadataStore.allNodes().size()); - throw new ControllerException(Code.NODE_UNAVAILABLE_VALUE); + throw new ControllerException(Code.NODE_UNAVAILABLE_VALUE, "No broker node is available"); } List assignments = new ArrayList<>(); diff --git a/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java b/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java index b98b5b3a7..09e3b34ba 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/ControllerServiceImplTest.java @@ -26,6 +26,7 @@ import apache.rocketmq.controller.v1.ControllerServiceGrpc; import apache.rocketmq.controller.v1.CreateGroupReply; import apache.rocketmq.controller.v1.CreateGroupRequest; +import apache.rocketmq.controller.v1.CreateTopicReply; import apache.rocketmq.controller.v1.CreateTopicRequest; import apache.rocketmq.controller.v1.DeleteTopicReply; import apache.rocketmq.controller.v1.DeleteTopicRequest; @@ -215,6 +216,51 @@ public void testHeartbeatGrpc() throws IOException { } } + @Test + public void testCreateTopic() throws IOException { + AcceptTypes acceptTypes = AcceptTypes.newBuilder() + .addTypes(MessageType.NORMAL) + .addTypes(MessageType.DELAY) + .build(); + ControllerClient controllerClient = Mockito.mock(ControllerClient.class); + try (DefaultMetadataStore metadataStore = new DefaultMetadataStore(controllerClient, getSessionFactory(), config)) { + metadataStore.start(); + Node node = new Node(); + node.setId(config.nodeId()); + node.setName(config.name()); + metadataStore.addBrokerNode(node); + Awaitility.await().with().pollInterval(100, TimeUnit.MILLISECONDS) + .atMost(10, TimeUnit.SECONDS) + .until(metadataStore::isLeader); + try (ControllerTestServer testServer = new ControllerTestServer(0, new ControllerServiceImpl(metadataStore))) { + testServer.start(); + int port = testServer.getPort(); + ManagedChannel channel = Grpc.newChannelBuilderForAddress("localhost", port, InsecureChannelCredentials.create()).build(); + ControllerServiceGrpc.ControllerServiceBlockingStub blockingStub = ControllerServiceGrpc.newBlockingStub(channel); + CreateTopicReply reply = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS) + .createTopic(CreateTopicRequest.newBuilder() + .setTopic("T1") + .setRetentionHours(1) + .setAcceptTypes(acceptTypes) + .setCount(4) + .build()); + + Assertions.assertEquals(Code.OK, reply.getStatus().getCode()); + try (SqlSession session = getSessionFactory().openSession()) { + TopicMapper topicMapper = session.getMapper(TopicMapper.class); + Topic topic = topicMapper.get(reply.getTopicId(), null); + Assertions.assertEquals("T1", topic.getName()); + Assertions.assertEquals(1, topic.getRetentionHours()); + Assertions.assertEquals(4, topic.getQueueNum()); + AcceptTypes.Builder builder = AcceptTypes.newBuilder(); + JsonFormat.parser().merge(topic.getAcceptMessageTypes(), builder); + Assertions.assertEquals(acceptTypes, builder.build()); + } + channel.shutdownNow(); + } + } + } + @Test public void testListTopics() throws IOException { AcceptTypes acceptTypes = AcceptTypes.newBuilder() @@ -962,6 +1008,7 @@ public void testListOpenStreams() throws IOException, ExecutionException, Interr } } } + @Test public void testCreateTopic_OpenStream_CloseStream() throws IOException, ExecutionException, InterruptedException, ControllerException { ControllerClient controllerClient = Mockito.mock(ControllerClient.class); diff --git a/controller/src/test/java/com/automq/rocketmq/controller/GrpcControllerClientTest.java b/controller/src/test/java/com/automq/rocketmq/controller/GrpcControllerClientTest.java index 7110f531d..7166b45ae 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/GrpcControllerClientTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/GrpcControllerClientTest.java @@ -137,6 +137,7 @@ public void testCreateTopic() throws ControllerException, IOException, Execution CreateTopicRequest request = CreateTopicRequest.newBuilder() .setTopic(topicName) .setCount(queueNum) + .setRetentionHours(1) .setAcceptTypes(AcceptTypes.newBuilder() .addTypes(MessageType.NORMAL) .addTypes(MessageType.FIFO)