Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(controller): reassign topic #818

Merged
merged 2 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.automq.rocketmq.cli.topic.DescribeTopic;
import com.automq.rocketmq.cli.topic.ListTopic;
import com.automq.rocketmq.cli.topic.PrintTopicStats;
import com.automq.rocketmq.cli.topic.ReassignTopic;
import com.automq.rocketmq.cli.topic.UpdateTopic;
import picocli.CommandLine;

Expand All @@ -50,6 +51,8 @@
UpdateTopic.class,
DeleteTopic.class,
ListTopic.class,
PrintTopicStats.class,
ReassignTopic.class,
DescribeStream.class,
CreateGroup.class,
DescribeGroup.class,
Expand All @@ -60,7 +63,6 @@
ConsumeMessage.class,
TerminateNode.class,
ResetConsumeOffset.class,
PrintTopicStats.class,
ProducerClientConnection.class,
ConsumerClientConnection.class
}
Expand Down
90 changes: 90 additions & 0 deletions cli/src/main/java/com/automq/rocketmq/cli/topic/ReassignTopic.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.cli.topic;

import apache.rocketmq.controller.v1.Cluster;
import apache.rocketmq.controller.v1.DescribeClusterRequest;
import apache.rocketmq.controller.v1.MessageQueue;
import apache.rocketmq.controller.v1.MessageQueueAssignment;
import apache.rocketmq.controller.v1.Node;
import apache.rocketmq.controller.v1.Topic;
import com.automq.rocketmq.cli.CliClientConfig;
import com.automq.rocketmq.cli.MQAdmin;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.client.GrpcControllerClient;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import picocli.CommandLine;

@CommandLine.Command(name = "reassignTopic", mixinStandardHelpOptions = true, showDefaultValues = true)
public class ReassignTopic implements Callable<Void> {
@CommandLine.ParentCommand
MQAdmin mqAdmin;

@CommandLine.Option(names = {"-t", "--topicName"}, description = "Topic name", required = true)
String topicName;

@CommandLine.Option(names = {"-q", "--queueId"}, description = "Queue id, -1 means all queues of given topic", required = true, defaultValue = "-1")
int queueId;

@CommandLine.Option(names = {"-n", "--nodeId"}, description = "Node ID", required = true)
int nodeId;

@Override
public Void call() throws Exception {
try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) {
Topic topic = client.describeTopic(mqAdmin.getEndpoint(), null, topicName).join();
if (null == topic) {
System.err.printf("Topic '%s' is not found%n%n", topicName);
return null;
}

Cluster cluster = client.describeCluster(mqAdmin.getEndpoint(), DescribeClusterRequest.newBuilder().build()).join();
Optional<Node> optionalNode = cluster.getNodesList().stream().filter(node -> node.getId() == nodeId).findFirst();
if (optionalNode.isEmpty()) {
System.err.printf("Node %d is not found%n%n", nodeId);
return null;
}

if (queueId == -1) {
List<MessageQueueAssignment> needToReassignQueueList = topic.getAssignmentsList().stream().filter(assignment -> assignment.getNodeId() != nodeId).toList();
for (MessageQueueAssignment assignment : needToReassignQueueList) {
MessageQueue queue = assignment.getQueue();
client.reassignMessageQueue(mqAdmin.getEndpoint(), queue.getTopicId(), queue.getQueueId(), nodeId).join();
System.out.printf("Reassign queue %d of topic %s from node %d to node %d%n", queue.getQueueId(), topicName, assignment.getNodeId(), nodeId);
}
} else {
Optional<MessageQueueAssignment> optionalAssignment = topic.getAssignmentsList().stream().filter(assignment -> assignment.getQueue().getQueueId() == queueId).findFirst();
if (optionalAssignment.isEmpty()) {
System.err.printf("Queue %d of topic %s is not found%n%n", queueId, topicName);
return null;
}
MessageQueueAssignment assignment = optionalAssignment.get();
if (assignment.getNodeId() == nodeId) {
System.out.printf("Queue %d of topic %s is already assigned to node %d%n", queueId, topicName, nodeId);
return null;
}
MessageQueue queue = assignment.getQueue();
client.reassignMessageQueue(mqAdmin.getEndpoint(), queue.getTopicId(), queue.getQueueId(), nodeId).join();
System.out.printf("Reassign queue %d of topic %s from node %d to node %d%n", queue.getQueueId(), topicName, assignment.getNodeId(), nodeId);
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

public interface DataStore {

CompletableFuture<Void> openQueue(long topicId, int queueId);

CompletableFuture<Void> closeQueue(long topicId, int queueId);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package com.automq.rocketmq.controller.server.store;

import apache.rocketmq.common.v1.Code;
import apache.rocketmq.controller.v1.AssignmentStatus;
import apache.rocketmq.controller.v1.Cluster;
import apache.rocketmq.controller.v1.ClusterSummary;
import apache.rocketmq.common.v1.Code;
import apache.rocketmq.controller.v1.ConsumerGroup;
import apache.rocketmq.controller.v1.CreateGroupRequest;
import apache.rocketmq.controller.v1.CreateTopicRequest;
Expand All @@ -36,15 +36,24 @@
import com.automq.rocketmq.common.PrefixThreadFactory;
import com.automq.rocketmq.common.api.DataStore;
import com.automq.rocketmq.common.config.ControllerConfig;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.ControllerClient;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.controller.server.store.impl.ElectionServiceImpl;
import com.automq.rocketmq.controller.server.store.impl.GroupManager;
import com.automq.rocketmq.controller.server.store.impl.StreamManager;
import com.automq.rocketmq.controller.server.store.impl.TopicManager;
import com.automq.rocketmq.controller.server.tasks.DataRetentionTask;
import com.automq.rocketmq.controller.server.tasks.HeartbeatTask;
import com.automq.rocketmq.controller.server.tasks.ReclaimS3ObjectTask;
import com.automq.rocketmq.controller.server.tasks.RecycleGroupTask;
import com.automq.rocketmq.controller.server.tasks.RecycleTopicTask;
import com.automq.rocketmq.controller.server.tasks.ScanAssignmentTask;
import com.automq.rocketmq.controller.server.tasks.ScanGroupTask;
import com.automq.rocketmq.controller.server.tasks.ScanNodeTask;
import com.automq.rocketmq.controller.server.tasks.ScanStreamTask;
import com.automq.rocketmq.controller.server.tasks.ScanTopicTask;
import com.automq.rocketmq.controller.server.tasks.ScanYieldingQueueTask;
import com.automq.rocketmq.metadata.dao.Group;
import com.automq.rocketmq.metadata.dao.GroupProgress;
import com.automq.rocketmq.metadata.dao.Lease;
Expand All @@ -58,16 +67,6 @@
import com.automq.rocketmq.metadata.mapper.NodeMapper;
import com.automq.rocketmq.metadata.mapper.QueueAssignmentMapper;
import com.automq.rocketmq.metadata.mapper.StreamMapper;
import com.automq.rocketmq.controller.server.tasks.HeartbeatTask;
import com.automq.rocketmq.controller.server.tasks.ReclaimS3ObjectTask;
import com.automq.rocketmq.controller.server.tasks.RecycleGroupTask;
import com.automq.rocketmq.controller.server.tasks.DataRetentionTask;
import com.automq.rocketmq.controller.server.tasks.RecycleTopicTask;
import com.automq.rocketmq.controller.server.tasks.ScanAssignmentTask;
import com.automq.rocketmq.controller.server.tasks.ScanNodeTask;
import com.automq.rocketmq.controller.server.tasks.ScanTopicTask;
import com.automq.rocketmq.controller.server.tasks.ScanYieldingQueueTask;
import com.automq.rocketmq.controller.server.tasks.SchedulerTask;
import com.google.common.base.Strings;
import com.google.protobuf.Timestamp;
import java.io.IOException;
Expand Down Expand Up @@ -196,8 +195,9 @@
config.scanIntervalInSecs(), TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleWithFixedDelay(new ScanYieldingQueueTask(this), 1,
config.scanIntervalInSecs(), TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleWithFixedDelay(new SchedulerTask(this), 1,
config.balanceWorkloadIntervalInSecs(), TimeUnit.SECONDS);
// Disable balance workload for now
// this.scheduledExecutorService.scheduleWithFixedDelay(new SchedulerTask(this), 1,
// config.balanceWorkloadIntervalInSecs(), TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new HeartbeatTask(this), 3,
Math.max(config().nodeAliveIntervalInSecs() / 2, 10), TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleWithFixedDelay(new RecycleTopicTask(this), 1,
Expand Down Expand Up @@ -554,22 +554,54 @@
for (; ; ) {
if (isLeader()) {
try (SqlSession session = openSession()) {
QueueAssignmentMapper assignmentMapper = session.getMapper(QueueAssignmentMapper.class);
List<QueueAssignment> assignments = assignmentMapper.list(topicId, null, null, AssignmentStatus.ASSIGNMENT_STATUS_YIELDING, null);
for (QueueAssignment assignment : assignments) {
if (assignment.getQueueId() != queueId) {
continue;
}
assignment.setStatus(AssignmentStatus.ASSIGNMENT_STATUS_ASSIGNED);
assignmentMapper.update(assignment);
LOGGER.info("Mark queue[topic-id={}, queue-id={}] assignable", topicId, queueId);
break;
if (!maintainLeadershipWithSharedLock(session)) {
continue;
}

QueueAssignmentMapper assignmentMapper = session.getMapper(QueueAssignmentMapper.class);
QueueAssignment assignment = assignmentMapper.get(topicId, queueId);
assignment.setSrcNodeId(assignment.getDstNodeId());
assignment.setStatus(AssignmentStatus.ASSIGNMENT_STATUS_ASSIGNED);
assignmentMapper.update(assignment);

StreamMapper streamMapper = session.getMapper(StreamMapper.class);
StreamCriteria criteria = StreamCriteria.newBuilder()
.withTopicId(topicId)
.withQueueId(queueId)
.build();
streamMapper.updateStreamAssignment(criteria, assignment.getDstNodeId(), assignment.getDstNodeId());
session.commit();
LOGGER.info("Update status of queue assignment and stream since all its belonging streams are closed," +
" having topic-id={}, queue-id={}", topicId, queueId);

// Notify the destination node that this queue is assignable
BrokerNode node = nodes.get(assignment.getDstNodeId());
if (node != null) {
controllerClient.notifyQueueClose(node.getNode().getAddress(), topicId, queueId)
.whenComplete((res, e) -> {

Check warning on line 581 in controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java#L580-L581

Added lines #L580 - L581 were not covered by tests
if (null != e) {
future.completeExceptionally(e);

Check warning on line 583 in controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java#L583

Added line #L583 was not covered by tests
}
future.complete(null);
});
} else {

Check warning on line 587 in controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java#L585-L587

Added lines #L585 - L587 were not covered by tests
future.complete(null);
LOGGER.warn("Node[{}] is not found, can not notify it that topic-id={}, queue-id={} is assigned", assignment.getDstNodeId(), topicId, queueId);
}
}
future.complete(null);
break;
return future;
} else {
// Check if this node is the leader of the queue
try (SqlSession session = openSession()) {
QueueAssignmentMapper assignmentMapper = session.getMapper(QueueAssignmentMapper.class);
QueueAssignment assignment = assignmentMapper.get(topicId, queueId);

Check warning on line 597 in controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java#L595-L597

Added lines #L595 - L597 were not covered by tests
if (assignment.getSrcNodeId() == config.nodeId() && assignment.getStatus() == AssignmentStatus.ASSIGNMENT_STATUS_ASSIGNED) {
applyAssignmentChange(List.of(assignment));
return dataStore.openQueue(topicId, queueId);

Check warning on line 600 in controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java#L599-L600

Added lines #L599 - L600 were not covered by tests
}
}

// Forward to leader
Optional<String> leaderAddress = electionService.leaderAddress();
if (leaderAddress.isEmpty()) {
return CompletableFuture.failedFuture(new ControllerException(Code.NO_LEADER_VALUE, "No leader is elected yet"));
Expand All @@ -581,9 +613,9 @@
future.complete(null);
}
});
return future;

Check warning on line 616 in controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java#L616

Added line #L616 was not covered by tests
}
}
return future;
}

@Override
Expand Down Expand Up @@ -652,20 +684,35 @@
}
QueueAssignmentMapper assignmentMapper = session.getMapper(QueueAssignmentMapper.class);
QueueAssignment assignment = assignmentMapper.get(topicId, queueId);
assignment.setSrcNodeId(assignment.getDstNodeId());
assignment.setStatus(AssignmentStatus.ASSIGNMENT_STATUS_ASSIGNED);
assignmentMapper.update(assignment);

StreamMapper streamMapper = session.getMapper(StreamMapper.class);

StreamCriteria criteria = StreamCriteria.newBuilder()
.withTopicId(topicId)
.withQueueId(queueId)
.withState(StreamState.CLOSING)
.build();
streamMapper.updateStreamState(criteria, StreamState.CLOSED);
streamMapper.updateStreamAssignment(criteria, assignment.getDstNodeId(), assignment.getDstNodeId());
session.commit();
LOGGER.info("Update status of queue assignment and stream since all its belonging streams are closed," +
" having topic-id={}, queue-id={}", topicId, queueId);
future.complete(null);

// Notify the destination node that this queue is assignable
BrokerNode node = nodes.get(assignment.getDstNodeId());
if (node != null) {
controllerClient.notifyQueueClose(node.getNode().getAddress(), topicId, queueId)
.whenComplete((res, e) -> {

Check warning on line 706 in controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java#L705-L706

Added lines #L705 - L706 were not covered by tests
if (null != e) {
future.completeExceptionally(e);

Check warning on line 708 in controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java#L708

Added line #L708 was not covered by tests
}
future.complete(null);
});
} else {

Check warning on line 712 in controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/store/DefaultMetadataStore.java#L710-L712

Added lines #L710 - L712 were not covered by tests
future.complete(null);
LOGGER.warn("Node[{}] is not found, can not notify it that topic-id={}, queue-id={} is assigned", assignment.getDstNodeId(), topicId, queueId);
}
return future;
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import com.automq.rocketmq.common.exception.ControllerException;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.controller.server.store.BrokerNode;
import com.automq.rocketmq.metadata.dao.Lease;
import com.automq.rocketmq.metadata.dao.Node;
import com.automq.rocketmq.metadata.mapper.LeaseMapper;
import com.automq.rocketmq.metadata.dao.Lease;
import com.automq.rocketmq.metadata.mapper.NodeMapper;
import java.util.Calendar;
import org.apache.ibatis.session.SqlSession;
Expand Down Expand Up @@ -75,7 +75,7 @@
session.commit();
metadataStore.electionService().updateLease(update);
} else {
LOGGER.info("Node[id={}, epoch={}] is controller leader currently, expiring at {}",
LOGGER.debug("Node[id={}, epoch={}] is controller leader currently, expiring at {}",

Check warning on line 78 in controller/src/main/java/com/automq/rocketmq/controller/server/tasks/LeaseTask.java

View check run for this annotation

Codecov / codecov/patch

controller/src/main/java/com/automq/rocketmq/controller/server/tasks/LeaseTask.java#L78

Added line #L78 was not covered by tests
lease.getNodeId(), lease.getEpoch(), lease.getExpirationTime());
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1259,9 +1259,15 @@ public void testOnQueueClose() throws IOException {

metadataStore.onQueueClosed(2, 1).join();

StreamMapper mapper = session.getMapper(StreamMapper.class);
Stream stream = mapper.getByStreamId(streamId);
assertEquals(stream.getState(), StreamState.CLOSED);
QueueAssignmentMapper assignmentMapper = session.getMapper(QueueAssignmentMapper.class);
QueueAssignment assignment = assignmentMapper.get(2, 1);
assertEquals(4, assignment.getDstNodeId());
assertEquals(4, assignment.getDstNodeId());

StreamMapper streamMapper = session.getMapper(StreamMapper.class);
Stream stream = streamMapper.getByStreamId(streamId);
assertEquals(4, stream.getDstNodeId());
assertEquals(4, stream.getDstNodeId());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
import apache.rocketmq.controller.v1.StreamState;
import com.automq.rocketmq.metadata.dao.Stream;
import com.automq.rocketmq.metadata.dao.StreamCriteria;
import org.apache.ibatis.annotations.Param;

import java.util.List;
import org.apache.ibatis.annotations.Param;

public interface StreamMapper {

Expand All @@ -42,6 +41,10 @@ public interface StreamMapper {

int updateStreamState(@Param("criteria") StreamCriteria criteria, @Param("state") StreamState state);

int updateStreamAssignment(@Param("criteria") StreamCriteria criteria,
@Param("srcNodeId") int src,
@Param("dstNodeId") int dst);

int delete(@Param("id") Long id);

List<Stream> byCriteria(@Param("criteria") StreamCriteria criteria);
Expand All @@ -51,7 +54,6 @@ public interface StreamMapper {
long queueEpoch(@Param("topicId") long topicId,
@Param("queueId") long queueId);


int planMove(@Param("criteria") StreamCriteria criteria,
@Param("srcNodeId") int src,
@Param("dstNodeId") int dst,
Expand Down
Loading
Loading