diff --git a/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java b/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java index 6aa408b7c..3498b834b 100644 --- a/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java +++ b/cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java @@ -35,6 +35,7 @@ import com.automq.rocketmq.cli.topic.DescribeTopic; import com.automq.rocketmq.cli.topic.ListTopic; import com.automq.rocketmq.cli.topic.PrintTopicStats; +import com.automq.rocketmq.cli.topic.ReassignTopic; import com.automq.rocketmq.cli.topic.UpdateTopic; import picocli.CommandLine; @@ -50,6 +51,8 @@ UpdateTopic.class, DeleteTopic.class, ListTopic.class, + PrintTopicStats.class, + ReassignTopic.class, DescribeStream.class, CreateGroup.class, DescribeGroup.class, @@ -60,7 +63,6 @@ ConsumeMessage.class, TerminateNode.class, ResetConsumeOffset.class, - PrintTopicStats.class, ProducerClientConnection.class, ConsumerClientConnection.class } diff --git a/cli/src/main/java/com/automq/rocketmq/cli/topic/ReassignTopic.java b/cli/src/main/java/com/automq/rocketmq/cli/topic/ReassignTopic.java new file mode 100644 index 000000000..c2e3edcf7 --- /dev/null +++ b/cli/src/main/java/com/automq/rocketmq/cli/topic/ReassignTopic.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.cli.topic; + +import apache.rocketmq.controller.v1.Cluster; +import apache.rocketmq.controller.v1.DescribeClusterRequest; +import apache.rocketmq.controller.v1.MessageQueue; +import apache.rocketmq.controller.v1.MessageQueueAssignment; +import apache.rocketmq.controller.v1.Node; +import apache.rocketmq.controller.v1.Topic; +import com.automq.rocketmq.cli.CliClientConfig; +import com.automq.rocketmq.cli.MQAdmin; +import com.automq.rocketmq.controller.ControllerClient; +import com.automq.rocketmq.controller.client.GrpcControllerClient; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Callable; +import picocli.CommandLine; + +@CommandLine.Command(name = "reassignTopic", mixinStandardHelpOptions = true, showDefaultValues = true) +public class ReassignTopic implements Callable { + @CommandLine.ParentCommand + MQAdmin mqAdmin; + + @CommandLine.Option(names = {"-t", "--topicName"}, description = "Topic name", required = true) + String topicName; + + @CommandLine.Option(names = {"-q", "--queueId"}, description = "Queue id, -1 means all queues of given topic", required = true, defaultValue = "-1") + int queueId; + + @CommandLine.Option(names = {"-n", "--nodeId"}, description = "Node ID", required = true) + int nodeId; + + @Override + public Void call() throws Exception { + try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) { + Topic topic = client.describeTopic(mqAdmin.getEndpoint(), null, topicName).join(); + if (null == topic) { + System.err.printf("Topic '%s' is not found%n%n", topicName); + return null; + } + + Cluster cluster = client.describeCluster(mqAdmin.getEndpoint(), DescribeClusterRequest.newBuilder().build()).join(); + Optional optionalNode = cluster.getNodesList().stream().filter(node -> node.getId() == nodeId).findFirst(); + if (optionalNode.isEmpty()) { + System.err.printf("Node %d is not found%n%n", nodeId); + return null; + } + + if (queueId == -1) { + List needToReassignQueueList = topic.getAssignmentsList().stream().filter(assignment -> assignment.getNodeId() != nodeId).toList(); + for (MessageQueueAssignment assignment : needToReassignQueueList) { + MessageQueue queue = assignment.getQueue(); + client.reassignMessageQueue(mqAdmin.getEndpoint(), queue.getTopicId(), queue.getQueueId(), nodeId).join(); + System.out.printf("Reassign queue %d of topic %s from node %d to node %d%n", queue.getQueueId(), topicName, assignment.getNodeId(), nodeId); + } + } else { + Optional optionalAssignment = topic.getAssignmentsList().stream().filter(assignment -> assignment.getQueue().getQueueId() == queueId).findFirst(); + if (optionalAssignment.isEmpty()) { + System.err.printf("Queue %d of topic %s is not found%n%n", queueId, topicName); + return null; + } + MessageQueueAssignment assignment = optionalAssignment.get(); + if (assignment.getNodeId() == nodeId) { + System.out.printf("Queue %d of topic %s is already assigned to node %d%n", queueId, topicName, nodeId); + return null; + } + MessageQueue queue = assignment.getQueue(); + client.reassignMessageQueue(mqAdmin.getEndpoint(), queue.getTopicId(), queue.getQueueId(), nodeId).join(); + System.out.printf("Reassign queue %d of topic %s from node %d to node %d%n", queue.getQueueId(), topicName, assignment.getNodeId(), nodeId); + } + } + return null; + } +} diff --git a/common/src/main/java/com/automq/rocketmq/common/api/DataStore.java b/common/src/main/java/com/automq/rocketmq/common/api/DataStore.java index c7f7a1e98..ebc517ee4 100644 --- a/common/src/main/java/com/automq/rocketmq/common/api/DataStore.java +++ b/common/src/main/java/com/automq/rocketmq/common/api/DataStore.java @@ -22,6 +22,8 @@ public interface DataStore { + CompletableFuture openQueue(long topicId, int queueId); + CompletableFuture closeQueue(long topicId, int queueId); /** diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java b/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java index 87c258073..97dfb045c 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java @@ -17,10 +17,10 @@ package com.automq.rocketmq.controller.server.store; +import apache.rocketmq.common.v1.Code; import apache.rocketmq.controller.v1.AssignmentStatus; import apache.rocketmq.controller.v1.Cluster; import apache.rocketmq.controller.v1.ClusterSummary; -import apache.rocketmq.common.v1.Code; import apache.rocketmq.controller.v1.ConsumerGroup; import apache.rocketmq.controller.v1.CreateGroupRequest; import apache.rocketmq.controller.v1.CreateTopicRequest; @@ -36,15 +36,24 @@ import com.automq.rocketmq.common.PrefixThreadFactory; import com.automq.rocketmq.common.api.DataStore; import com.automq.rocketmq.common.config.ControllerConfig; -import com.automq.rocketmq.controller.ControllerClient; import com.automq.rocketmq.common.exception.ControllerException; +import com.automq.rocketmq.controller.ControllerClient; import com.automq.rocketmq.controller.MetadataStore; import com.automq.rocketmq.controller.server.store.impl.ElectionServiceImpl; import com.automq.rocketmq.controller.server.store.impl.GroupManager; import com.automq.rocketmq.controller.server.store.impl.StreamManager; import com.automq.rocketmq.controller.server.store.impl.TopicManager; +import com.automq.rocketmq.controller.server.tasks.DataRetentionTask; +import com.automq.rocketmq.controller.server.tasks.HeartbeatTask; +import com.automq.rocketmq.controller.server.tasks.ReclaimS3ObjectTask; +import com.automq.rocketmq.controller.server.tasks.RecycleGroupTask; +import com.automq.rocketmq.controller.server.tasks.RecycleTopicTask; +import com.automq.rocketmq.controller.server.tasks.ScanAssignmentTask; import com.automq.rocketmq.controller.server.tasks.ScanGroupTask; +import com.automq.rocketmq.controller.server.tasks.ScanNodeTask; import com.automq.rocketmq.controller.server.tasks.ScanStreamTask; +import com.automq.rocketmq.controller.server.tasks.ScanTopicTask; +import com.automq.rocketmq.controller.server.tasks.ScanYieldingQueueTask; import com.automq.rocketmq.metadata.dao.Group; import com.automq.rocketmq.metadata.dao.GroupProgress; import com.automq.rocketmq.metadata.dao.Lease; @@ -58,16 +67,6 @@ import com.automq.rocketmq.metadata.mapper.NodeMapper; import com.automq.rocketmq.metadata.mapper.QueueAssignmentMapper; import com.automq.rocketmq.metadata.mapper.StreamMapper; -import com.automq.rocketmq.controller.server.tasks.HeartbeatTask; -import com.automq.rocketmq.controller.server.tasks.ReclaimS3ObjectTask; -import com.automq.rocketmq.controller.server.tasks.RecycleGroupTask; -import com.automq.rocketmq.controller.server.tasks.DataRetentionTask; -import com.automq.rocketmq.controller.server.tasks.RecycleTopicTask; -import com.automq.rocketmq.controller.server.tasks.ScanAssignmentTask; -import com.automq.rocketmq.controller.server.tasks.ScanNodeTask; -import com.automq.rocketmq.controller.server.tasks.ScanTopicTask; -import com.automq.rocketmq.controller.server.tasks.ScanYieldingQueueTask; -import com.automq.rocketmq.controller.server.tasks.SchedulerTask; import com.google.common.base.Strings; import com.google.protobuf.Timestamp; import java.io.IOException; @@ -196,8 +195,9 @@ public void start() { config.scanIntervalInSecs(), TimeUnit.SECONDS); this.scheduledExecutorService.scheduleWithFixedDelay(new ScanYieldingQueueTask(this), 1, config.scanIntervalInSecs(), TimeUnit.SECONDS); - this.scheduledExecutorService.scheduleWithFixedDelay(new SchedulerTask(this), 1, - config.balanceWorkloadIntervalInSecs(), TimeUnit.SECONDS); + // Disable balance workload for now +// this.scheduledExecutorService.scheduleWithFixedDelay(new SchedulerTask(this), 1, +// config.balanceWorkloadIntervalInSecs(), TimeUnit.SECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new HeartbeatTask(this), 3, Math.max(config().nodeAliveIntervalInSecs() / 2, 10), TimeUnit.SECONDS); this.scheduledExecutorService.scheduleWithFixedDelay(new RecycleTopicTask(this), 1, @@ -554,22 +554,54 @@ public CompletableFuture markMessageQueueAssignable(long topicId, int queu for (; ; ) { if (isLeader()) { try (SqlSession session = openSession()) { - QueueAssignmentMapper assignmentMapper = session.getMapper(QueueAssignmentMapper.class); - List assignments = assignmentMapper.list(topicId, null, null, AssignmentStatus.ASSIGNMENT_STATUS_YIELDING, null); - for (QueueAssignment assignment : assignments) { - if (assignment.getQueueId() != queueId) { - continue; - } - assignment.setStatus(AssignmentStatus.ASSIGNMENT_STATUS_ASSIGNED); - assignmentMapper.update(assignment); - LOGGER.info("Mark queue[topic-id={}, queue-id={}] assignable", topicId, queueId); - break; + if (!maintainLeadershipWithSharedLock(session)) { + continue; } + + QueueAssignmentMapper assignmentMapper = session.getMapper(QueueAssignmentMapper.class); + QueueAssignment assignment = assignmentMapper.get(topicId, queueId); + assignment.setSrcNodeId(assignment.getDstNodeId()); + assignment.setStatus(AssignmentStatus.ASSIGNMENT_STATUS_ASSIGNED); + assignmentMapper.update(assignment); + + StreamMapper streamMapper = session.getMapper(StreamMapper.class); + StreamCriteria criteria = StreamCriteria.newBuilder() + .withTopicId(topicId) + .withQueueId(queueId) + .build(); + streamMapper.updateStreamAssignment(criteria, assignment.getDstNodeId(), assignment.getDstNodeId()); session.commit(); + LOGGER.info("Update status of queue assignment and stream since all its belonging streams are closed," + + " having topic-id={}, queue-id={}", topicId, queueId); + + // Notify the destination node that this queue is assignable + BrokerNode node = nodes.get(assignment.getDstNodeId()); + if (node != null) { + controllerClient.notifyQueueClose(node.getNode().getAddress(), topicId, queueId) + .whenComplete((res, e) -> { + if (null != e) { + future.completeExceptionally(e); + } + future.complete(null); + }); + } else { + future.complete(null); + LOGGER.warn("Node[{}] is not found, can not notify it that topic-id={}, queue-id={} is assigned", assignment.getDstNodeId(), topicId, queueId); + } } - future.complete(null); - break; + return future; } else { + // Check if this node is the leader of the queue + try (SqlSession session = openSession()) { + QueueAssignmentMapper assignmentMapper = session.getMapper(QueueAssignmentMapper.class); + QueueAssignment assignment = assignmentMapper.get(topicId, queueId); + if (assignment.getSrcNodeId() == config.nodeId() && assignment.getStatus() == AssignmentStatus.ASSIGNMENT_STATUS_ASSIGNED) { + applyAssignmentChange(List.of(assignment)); + return dataStore.openQueue(topicId, queueId); + } + } + + // Forward to leader Optional leaderAddress = electionService.leaderAddress(); if (leaderAddress.isEmpty()) { return CompletableFuture.failedFuture(new ControllerException(Code.NO_LEADER_VALUE, "No leader is elected yet")); @@ -581,9 +613,9 @@ public CompletableFuture markMessageQueueAssignable(long topicId, int queu future.complete(null); } }); + return future; } } - return future; } @Override @@ -652,20 +684,35 @@ public CompletableFuture onQueueClosed(long topicId, int queueId) { } QueueAssignmentMapper assignmentMapper = session.getMapper(QueueAssignmentMapper.class); QueueAssignment assignment = assignmentMapper.get(topicId, queueId); + assignment.setSrcNodeId(assignment.getDstNodeId()); assignment.setStatus(AssignmentStatus.ASSIGNMENT_STATUS_ASSIGNED); assignmentMapper.update(assignment); StreamMapper streamMapper = session.getMapper(StreamMapper.class); + StreamCriteria criteria = StreamCriteria.newBuilder() .withTopicId(topicId) .withQueueId(queueId) - .withState(StreamState.CLOSING) .build(); - streamMapper.updateStreamState(criteria, StreamState.CLOSED); + streamMapper.updateStreamAssignment(criteria, assignment.getDstNodeId(), assignment.getDstNodeId()); session.commit(); LOGGER.info("Update status of queue assignment and stream since all its belonging streams are closed," + " having topic-id={}, queue-id={}", topicId, queueId); - future.complete(null); + + // Notify the destination node that this queue is assignable + BrokerNode node = nodes.get(assignment.getDstNodeId()); + if (node != null) { + controllerClient.notifyQueueClose(node.getNode().getAddress(), topicId, queueId) + .whenComplete((res, e) -> { + if (null != e) { + future.completeExceptionally(e); + } + future.complete(null); + }); + } else { + future.complete(null); + LOGGER.warn("Node[{}] is not found, can not notify it that topic-id={}, queue-id={} is assigned", assignment.getDstNodeId(), topicId, queueId); + } return future; } } else { diff --git a/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/LeaseTask.java b/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/LeaseTask.java index 3a34946e6..964fcdbc0 100644 --- a/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/LeaseTask.java +++ b/controller/src/main/java/com/automq/rocketmq/controller/server/tasks/LeaseTask.java @@ -21,9 +21,9 @@ import com.automq.rocketmq.common.exception.ControllerException; import com.automq.rocketmq.controller.MetadataStore; import com.automq.rocketmq.controller.server.store.BrokerNode; +import com.automq.rocketmq.metadata.dao.Lease; import com.automq.rocketmq.metadata.dao.Node; import com.automq.rocketmq.metadata.mapper.LeaseMapper; -import com.automq.rocketmq.metadata.dao.Lease; import com.automq.rocketmq.metadata.mapper.NodeMapper; import java.util.Calendar; import org.apache.ibatis.session.SqlSession; @@ -75,7 +75,7 @@ public void process() throws ControllerException { session.commit(); metadataStore.electionService().updateLease(update); } else { - LOGGER.info("Node[id={}, epoch={}] is controller leader currently, expiring at {}", + LOGGER.debug("Node[id={}, epoch={}] is controller leader currently, expiring at {}", lease.getNodeId(), lease.getEpoch(), lease.getExpirationTime()); } } else { diff --git a/controller/src/test/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStoreTest.java b/controller/src/test/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStoreTest.java index ce6585d81..c8ca820c3 100644 --- a/controller/src/test/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStoreTest.java +++ b/controller/src/test/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStoreTest.java @@ -1259,9 +1259,15 @@ public void testOnQueueClose() throws IOException { metadataStore.onQueueClosed(2, 1).join(); - StreamMapper mapper = session.getMapper(StreamMapper.class); - Stream stream = mapper.getByStreamId(streamId); - assertEquals(stream.getState(), StreamState.CLOSED); + QueueAssignmentMapper assignmentMapper = session.getMapper(QueueAssignmentMapper.class); + QueueAssignment assignment = assignmentMapper.get(2, 1); + assertEquals(4, assignment.getDstNodeId()); + assertEquals(4, assignment.getDstNodeId()); + + StreamMapper streamMapper = session.getMapper(StreamMapper.class); + Stream stream = streamMapper.getByStreamId(streamId); + assertEquals(4, stream.getDstNodeId()); + assertEquals(4, stream.getDstNodeId()); } } diff --git a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/mapper/StreamMapper.java b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/mapper/StreamMapper.java index 2c20511fb..137a35522 100644 --- a/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/mapper/StreamMapper.java +++ b/metadata-jdbc/src/main/java/com/automq/rocketmq/metadata/mapper/StreamMapper.java @@ -21,9 +21,8 @@ import apache.rocketmq.controller.v1.StreamState; import com.automq.rocketmq.metadata.dao.Stream; import com.automq.rocketmq.metadata.dao.StreamCriteria; -import org.apache.ibatis.annotations.Param; - import java.util.List; +import org.apache.ibatis.annotations.Param; public interface StreamMapper { @@ -42,6 +41,10 @@ public interface StreamMapper { int updateStreamState(@Param("criteria") StreamCriteria criteria, @Param("state") StreamState state); + int updateStreamAssignment(@Param("criteria") StreamCriteria criteria, + @Param("srcNodeId") int src, + @Param("dstNodeId") int dst); + int delete(@Param("id") Long id); List byCriteria(@Param("criteria") StreamCriteria criteria); @@ -51,7 +54,6 @@ public interface StreamMapper { long queueEpoch(@Param("topicId") long topicId, @Param("queueId") long queueId); - int planMove(@Param("criteria") StreamCriteria criteria, @Param("srcNodeId") int src, @Param("dstNodeId") int dst, diff --git a/metadata-jdbc/src/main/resources/database/mapper/StreamMapper.xml b/metadata-jdbc/src/main/resources/database/mapper/StreamMapper.xml index 181bcbc82..86a8d2193 100644 --- a/metadata-jdbc/src/main/resources/database/mapper/StreamMapper.xml +++ b/metadata-jdbc/src/main/resources/database/mapper/StreamMapper.xml @@ -102,6 +102,22 @@ + + UPDATE stream + SET src_node_id = #{srcNodeId}, + dst_node_id = #{dstNodeId} + + + + #{item} + + + AND topic_id = #{criteria.topicId} + AND queue_id = #{criteria.queueId} + AND state = #{criteria.state} + + + UPDATE stream SET src_node_id = #{srcNodeId}, diff --git a/store/src/main/java/com/automq/rocketmq/store/DataStoreFacade.java b/store/src/main/java/com/automq/rocketmq/store/DataStoreFacade.java index 217dd73e1..19df23792 100644 --- a/store/src/main/java/com/automq/rocketmq/store/DataStoreFacade.java +++ b/store/src/main/java/com/automq/rocketmq/store/DataStoreFacade.java @@ -21,6 +21,7 @@ import com.automq.rocketmq.store.api.LogicQueueManager; import com.automq.rocketmq.store.api.S3ObjectOperator; import com.automq.rocketmq.store.api.StreamStore; +import com.automq.rocketmq.store.model.StoreContext; import com.google.common.base.Stopwatch; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -43,6 +44,13 @@ public DataStoreFacade(StreamStore streamStore, S3ObjectOperator s3ObjectOperato this.logicQueueManager = logicQueueManager; } + @Override + public CompletableFuture openQueue(long topicId, int queueId) { + return logicQueueManager.getOrCreate(StoreContext.EMPTY, topicId, queueId) + .thenAccept(ignore -> { + }); + } + @Override public CompletableFuture closeQueue(long topicId, int queueId) { return logicQueueManager.close(topicId, queueId);