Skip to content

Commit

Permalink
feat: implement notify message queue assignable and assign it to alte…
Browse files Browse the repository at this point in the history
…rnative alive nodes (#93)

Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
  • Loading branch information
lizhanhui authored Sep 25, 2023
1 parent c8659ce commit bd74edf
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,9 @@
import apache.rocketmq.controller.v1.ListMessageQueueReassignmentsRequest;
import apache.rocketmq.controller.v1.ListOpenStreamsRequest;
import apache.rocketmq.controller.v1.ListOpeningStreamsReply;
import apache.rocketmq.controller.v1.ListTopicMessageQueueAssignmentsReply;
import apache.rocketmq.controller.v1.ListTopicMessageQueueAssignmentsRequest;
import apache.rocketmq.controller.v1.ListTopicsReply;
import apache.rocketmq.controller.v1.ListTopicsRequest;
import apache.rocketmq.controller.v1.MessageQueue;
import apache.rocketmq.controller.v1.NodeRegistrationReply;
import apache.rocketmq.controller.v1.NodeRegistrationRequest;
import apache.rocketmq.controller.v1.NodeUnregistrationReply;
Expand Down Expand Up @@ -219,12 +218,6 @@ public void deleteTopic(DeleteTopicRequest request, StreamObserver<DeleteTopicRe
}
}

@Override
public void listTopicMessageQueues(ListTopicMessageQueueAssignmentsRequest request,
StreamObserver<ListTopicMessageQueueAssignmentsReply> responseObserver) {
super.listTopicMessageQueues(request, responseObserver);
}

@Override
public void reassignMessageQueue(ReassignMessageQueueRequest request,
StreamObserver<ReassignMessageQueueReply> responseObserver) {
Expand All @@ -234,7 +227,17 @@ public void reassignMessageQueue(ReassignMessageQueueRequest request,
@Override
public void notifyMessageQueueAssignable(NotifyMessageQueuesAssignableRequest request,
StreamObserver<NotifyMessageQueuesAssignableReply> responseObserver) {
super.notifyMessageQueueAssignable(request, responseObserver);
try {
for (MessageQueue messageQueue : request.getQueuesList()) {
this.metadataStore.markMessageQueueAssignable(messageQueue.getTopicId(), messageQueue.getQueueId());
}
NotifyMessageQueuesAssignableReply reply = NotifyMessageQueuesAssignableReply.newBuilder()
.setStatus(Status.newBuilder().setCode(Code.OK).build()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
} catch (ControllerException e) {
responseObserver.onError(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,7 @@ CompletableFuture<Node> registerBroker(String target, String name, String addres

CompletableFuture<Void> heartbeat(String target, int nodeId, long epoch,
boolean goingAway) throws ControllerException;

CompletableFuture<Void> notifyMessageQueueAssignable(String target, long topicId,
int queueId) throws ControllerException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import apache.rocketmq.controller.v1.DescribeTopicRequest;
import apache.rocketmq.controller.v1.HeartbeatReply;
import apache.rocketmq.controller.v1.HeartbeatRequest;
import apache.rocketmq.controller.v1.MessageQueue;
import apache.rocketmq.controller.v1.NodeRegistrationReply;
import apache.rocketmq.controller.v1.NodeRegistrationRequest;
import apache.rocketmq.controller.v1.Code;
import apache.rocketmq.controller.v1.ControllerServiceGrpc;
import apache.rocketmq.controller.v1.NotifyMessageQueuesAssignableReply;
import apache.rocketmq.controller.v1.NotifyMessageQueuesAssignableRequest;
import apache.rocketmq.controller.v1.Topic;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.controller.metadata.database.dao.Node;
Expand Down Expand Up @@ -206,15 +209,21 @@ public CompletableFuture<Void> heartbeat(String target, int nodeId, long epoch,
buildStubForTarget(target);

ControllerServiceGrpc.ControllerServiceFutureStub stub = stubs.get(target);
HeartbeatRequest request = HeartbeatRequest.newBuilder().setId(nodeId).setEpoch(epoch).setGoingAway(goingAway).build();
HeartbeatRequest request = HeartbeatRequest
.newBuilder()
.setId(nodeId)
.setEpoch(epoch)
.setGoingAway(goingAway)
.build();
CompletableFuture<Void> future = new CompletableFuture<>();
Futures.addCallback(stub.heartbeat(request), new FutureCallback<>() {
@Override
public void onSuccess(HeartbeatReply result) {
if (result.getStatus().getCode() == Code.OK) {
future.complete(null);
} else {
future.completeExceptionally(new ControllerException(result.getStatus().getCodeValue(), result.getStatus().getMessage()));
future.completeExceptionally(
new ControllerException(result.getStatus().getCodeValue(), result.getStatus().getMessage()));
}
}

Expand All @@ -225,4 +234,37 @@ public void onFailure(@Nonnull Throwable t) {
}, MoreExecutors.directExecutor());
return future;
}

@Override
public CompletableFuture<Void> notifyMessageQueueAssignable(String target, long topicId,
int queueId) throws ControllerException {
CompletableFuture<Void> future = new CompletableFuture<>();

NotifyMessageQueuesAssignableRequest request = NotifyMessageQueuesAssignableRequest.newBuilder()
.addQueues(MessageQueue.newBuilder()
.setTopicId(topicId)
.setQueueId(queueId).build())
.build();

Futures.addCallback(buildStubForTarget(target).notifyMessageQueueAssignable(request),
new FutureCallback<>() {
@Override
public void onSuccess(NotifyMessageQueuesAssignableReply result) {
if (result.getStatus().getCode() == Code.OK) {
future.complete(null);
} else {
future.completeExceptionally(
new ControllerException(result.getStatus().getCodeValue(), result.getStatus().getMessage())
);
}
}

@Override
public void onFailure(@Nonnull Throwable t) {
future.completeExceptionally(t);
}
}, MoreExecutors.directExecutor());

return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,6 @@ public interface MetadataStore extends Closeable {
*/
List<QueueAssignment> listAssignments(Long topicId, Integer srcNodeId, Integer dstNodeId,
QueueAssignmentStatus status);

void markMessageQueueAssignable(long topicId, int queueId) throws ControllerException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.automq.rocketmq.controller.metadata.database.mapper.QueueAssignmentMapper;
import com.automq.rocketmq.controller.metadata.database.mapper.TopicMapper;
import com.automq.rocketmq.controller.metadata.database.tasks.LeaseTask;
import com.automq.rocketmq.controller.metadata.database.tasks.ScanAssignableMessageQueuesTask;
import com.automq.rocketmq.controller.metadata.database.tasks.ScanNodeTask;
import com.google.common.base.Strings;
import java.io.IOException;
Expand Down Expand Up @@ -91,6 +92,7 @@ public DefaultMetadataStore(ControllerClient client, SqlSessionFactory sessionFa
public void start() {
this.executorService.scheduleAtFixedRate(new LeaseTask(this), 1, config.scanIntervalInSecs(), TimeUnit.SECONDS);
this.executorService.scheduleWithFixedDelay(new ScanNodeTask(this), 1, config.scanIntervalInSecs(), TimeUnit.SECONDS);
this.executorService.scheduleAtFixedRate(new ScanAssignableMessageQueuesTask(this), 1, config.scanIntervalInSecs(), TimeUnit.SECONDS);
LOGGER.info("MetadataStore tasks scheduled");
}

Expand Down Expand Up @@ -153,6 +155,38 @@ public void keepAlive(int nodeId, long epoch, boolean goingAway) {
}
}

public boolean assignMessageQueues(List<QueueAssignment> assignments, SqlSession session) {
if (!maintainLeadershipWithSharedLock(session)) {
return false;
}

List<Integer> aliveNodeIds =
this.nodes.values()
.stream()
.filter(brokerNode ->
brokerNode.isAlive(TimeUnit.SECONDS.toMillis(config.nodeAliveIntervalInSecs())) ||
brokerNode.getNode().getId() == config.nodeId())
.map(node -> node.getNode().getId())
.toList();
if (aliveNodeIds.isEmpty()) {
return false;
}

QueueAssignmentMapper assignmentMapper = session.getMapper(QueueAssignmentMapper.class);

Set<Integer> toNotify = new HashSet<>();

for (int i = 0; i < assignments.size(); i++) {
int idx = i % aliveNodeIds.size();
QueueAssignment assignment = assignments.get(i);
assignment.setDstNodeId(aliveNodeIds.get(idx));
toNotify.add(aliveNodeIds.get(idx));
assignmentMapper.update(assignment);
}
this.notifyOnResourceChange(toNotify);
return true;
}

private boolean maintainLeadershipWithSharedLock(SqlSession session) {
LeaseMapper leaseMapper = session.getMapper(LeaseMapper.class);
Lease current = leaseMapper.currentWithShareLock();
Expand Down Expand Up @@ -406,6 +440,42 @@ public List<QueueAssignment> listAssignments(Long topicId, Integer srcNodeId, In
}
}

@Override
public void markMessageQueueAssignable(long topicId, int queueId) throws ControllerException {
for (; ; ) {
if (isLeader()) {
try (SqlSession session = getSessionFactory().openSession()) {
QueueAssignmentMapper assignmentMapper = session.getMapper(QueueAssignmentMapper.class);
List<QueueAssignment> assignments = assignmentMapper.list(topicId, null, null, QueueAssignmentStatus.YIELDING, null);
for (QueueAssignment assignment : assignments) {
if (assignment.getQueueId() != queueId) {
continue;
}

assignment.setSrcNodeId(assignment.getDstNodeId());
assignment.setStatus(QueueAssignmentStatus.ASSIGNABLE);
assignmentMapper.update(assignment);
LOGGER.info("Mark queue[topic-id={}, queue-id={}] assignable", topicId, queueId);
break;
}
session.commit();
}
break;
} else {
try {
this.controllerClient.notifyMessageQueueAssignable(leaderAddress(), topicId, queueId).get();
} catch (InterruptedException e) {
throw new ControllerException(Code.INTERRUPTED_VALUE, e);
} catch (ExecutionException e) {
if (e.getCause() instanceof ControllerException) {
throw (ControllerException) e.getCause();
}
throw new ControllerException(Code.INTERNAL_VALUE, e);
}
}
}
}

@Override
public void close() throws IOException {
this.executorService.shutdown();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.controller.metadata.database.tasks;

import com.automq.rocketmq.controller.metadata.database.DefaultMetadataStore;
import com.automq.rocketmq.controller.metadata.database.dao.QueueAssignment;
import com.automq.rocketmq.controller.metadata.database.dao.QueueAssignmentStatus;
import com.automq.rocketmq.controller.metadata.database.mapper.QueueAssignmentMapper;
import java.util.List;
import org.apache.ibatis.session.SqlSession;

/**
* Scan and assign the message queues that are marked assignable to alive nodes.
*/
public class ScanAssignableMessageQueuesTask extends ScanTask {

public ScanAssignableMessageQueuesTask(DefaultMetadataStore metadataStore) {
super(metadataStore);
}

@Override
public void run() {
try {
if (!metadataStore.isLeader()) {
return;
}

try (SqlSession session = metadataStore.getSessionFactory().openSession()) {
QueueAssignmentMapper assignmentMapper = session.getMapper(QueueAssignmentMapper.class);
List<QueueAssignment> assignments = assignmentMapper.list(null, null, null,
QueueAssignmentStatus.ASSIGNABLE, this.lastScanTime);
if (doAssign(assignments, session)) {
session.commit();
}
}
} catch (Throwable e) {
LOGGER.error("Unexpected exception raised while scan assignable message queues", e);
}
}

private boolean doAssign(List<QueueAssignment> assignments, SqlSession session) {
if (null == assignments || assignments.isEmpty()) {
return true;
}

if (!metadataStore.assignMessageQueues(assignments, session)) {
return false;
}

for (QueueAssignment assignment : assignments) {
if (null == this.lastScanTime || assignment.getUpdateTime().after(this.lastScanTime)) {
this.lastScanTime = assignment.getUpdateTime();
}
}
return true;
}
}
14 changes: 0 additions & 14 deletions controller/src/main/proto/controller.proto
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,6 @@ message NotifyMessageQueuesAssignableReply {
Status status = 1;
}

message ListTopicMessageQueueAssignmentsRequest {
ControllerRequestContext context = 1;

// Use 'topic name' instead of 'topic_id' for the convenience of building toolset
string name = 2;
}

message ListTopicMessageQueueAssignmentsReply {
Status status = 1;
repeated MessageQueueAssignment assignments = 2;
}

message CommitOffsetRequest {
ControllerRequestContext context = 1;
int64 group_id = 2;
Expand Down Expand Up @@ -362,8 +350,6 @@ service ControllerService {

rpc deleteTopic(DeleteTopicRequest) returns (DeleteTopicReply) {}

rpc listTopicMessageQueues(ListTopicMessageQueueAssignmentsRequest) returns (ListTopicMessageQueueAssignmentsReply) {}

// Reassign message queue from one broker to another.
rpc reassignMessageQueue(ReassignMessageQueueRequest) returns (ReassignMessageQueueReply) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,22 @@ public void testDescribeTopic_NotFound() throws IOException, ControllerException
});
}
}

@Test
public void testNotifyMessageQueueAssignable() throws IOException, ControllerException, ExecutionException, InterruptedException {
long topicId = 1L;
int queueId = 2;
MetadataStore metadataStore = Mockito.mock(MetadataStore.class);
Mockito.doNothing().when(metadataStore)
.markMessageQueueAssignable(ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt());
ControllerServiceImpl svc = new ControllerServiceImpl(metadataStore);
try (ControllerTestServer testServer = new ControllerTestServer(0, svc)) {
testServer.start();
int port = testServer.getPort();
ControllerClient client = new GrpcControllerClient();
Assertions.assertDoesNotThrow(() -> {
client.notifyMessageQueueAssignable(String.format("localhost:%d", port), topicId, queueId).get();
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -358,4 +358,36 @@ public void testDescribeTopic() throws IOException, ControllerException {
Assertions.assertEquals(1, topic.getReassignmentsCount());
}
}

@Test
public void testMarkMessageQueueAssignable() throws IOException, ControllerException {
try (SqlSession session = getSessionFactory().openSession()) {
QueueAssignmentMapper assignmentMapper = session.getMapper(QueueAssignmentMapper.class);
QueueAssignment assignment = new QueueAssignment();
assignment.setQueueId(1);
assignment.setTopicId(2);
assignment.setStatus(QueueAssignmentStatus.YIELDING);
assignmentMapper.create(assignment);
session.commit();
}

ControllerConfig config = Mockito.mock(ControllerConfig.class);
Mockito.when(config.nodeId()).thenReturn(1);
Mockito.when(config.scanIntervalInSecs()).thenReturn(1);
Mockito.when(config.leaseLifeSpanInSecs()).thenReturn(2);
try (MetadataStore metadataStore = new DefaultMetadataStore(client, getSessionFactory(), config)) {
metadataStore.start();
Awaitility.await().with().atMost(10, TimeUnit.SECONDS).pollInterval(100, TimeUnit.MILLISECONDS)
.until(metadataStore::isLeader);
metadataStore.markMessageQueueAssignable(2, 1);

List<QueueAssignment> assignments = metadataStore.listAssignments(2L, null, null, null);
for (QueueAssignment assignment : assignments) {
if (assignment.getQueueId() == 1) {
Assertions.assertEquals(QueueAssignmentStatus.ASSIGNABLE, assignment.getStatus());
}
}
}
}

}

0 comments on commit bd74edf

Please sign in to comment.