Skip to content

Commit

Permalink
feat: implement update-group command
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
  • Loading branch information
lizhanhui committed Oct 30, 2023
1 parent f06035b commit 089e6db
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 0 deletions.
1 change: 1 addition & 0 deletions cli/src/main/java/com/automq/rocketmq/cli/MQAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
ListTopic.class,
CreateGroup.class,
DescribeGroup.class,
UpdateGroup.class,
ListGroup.class,
ProduceMessage.class,
ConsumeMessage.class,
Expand Down
73 changes: 73 additions & 0 deletions cli/src/main/java/com/automq/rocketmq/cli/UpdateGroup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.cli;

import apache.rocketmq.controller.v1.GroupType;
import apache.rocketmq.controller.v1.UpdateGroupRequest;
import com.automq.rocketmq.controller.metadata.ControllerClient;
import com.automq.rocketmq.controller.metadata.GrpcControllerClient;
import com.google.common.base.Strings;
import java.util.concurrent.Callable;
import picocli.CommandLine;

@CommandLine.Command(name = "updateGroup", mixinStandardHelpOptions = true, showDefaultValues = true)
public class UpdateGroup implements Callable<Void> {

@CommandLine.ParentCommand
MQAdmin mqAdmin;

@CommandLine.Option(names = {"-i", "--id"}, description = "Group ID", required = true)
long groupId;

@CommandLine.Option(names = {"-n", "--name"}, description = "Group Name")
String name;

@CommandLine.Option(names = {"-d", "--deadLetterTopicId"}, description = "Dead Letter Topic ID")
Long deadLetterTopicId;

@CommandLine.Option(names = {"-m", "--maxDeliveryAttempt"}, description = "Max Delivery Attempt")
Integer maxDeliveryAttempt;

@CommandLine.Option(names = {"-t", "--type"}, description = "Group Name")
GroupType groupType;

@Override
public Void call() throws Exception {
try (ControllerClient client = new GrpcControllerClient(new CliClientConfig())) {
UpdateGroupRequest.Builder builder = UpdateGroupRequest.newBuilder().setGroupId(groupId);
if (!Strings.isNullOrEmpty(name)) {
builder.setName(name);
}

if (null != deadLetterTopicId) {
builder.setDeadLetterTopicId(deadLetterTopicId);
}

if (null != maxDeliveryAttempt) {
builder.setMaxRetryAttempt(maxDeliveryAttempt);
}

if (null != groupType) {
builder.setGroupType(groupType);
}

client.updateGroup(mqAdmin.endpoint, builder.build()).join();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
import apache.rocketmq.controller.v1.Topic;
import apache.rocketmq.controller.v1.TrimStreamReply;
import apache.rocketmq.controller.v1.TrimStreamRequest;
import apache.rocketmq.controller.v1.UpdateGroupReply;
import apache.rocketmq.controller.v1.UpdateGroupRequest;
import apache.rocketmq.controller.v1.UpdateTopicReply;
import apache.rocketmq.controller.v1.UpdateTopicRequest;
import com.automq.rocketmq.common.PrefixThreadFactory;
Expand Down Expand Up @@ -461,6 +463,11 @@ public void describeGroup(DescribeGroupRequest request, StreamObserver<DescribeG
}));
}

@Override
public void updateGroup(UpdateGroupRequest request, StreamObserver<UpdateGroupReply> responseObserver) {

}

@Override
public void deleteGroup(DeleteGroupRequest request, StreamObserver<DeleteGroupReply> responseObserver) {
metadataStore.deleteGroup(request.getId()).whenComplete((res, e) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import apache.rocketmq.controller.v1.TrimStreamReply;
import apache.rocketmq.controller.v1.TrimStreamRequest;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.UpdateGroupRequest;
import apache.rocketmq.controller.v1.UpdateTopicRequest;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.controller.metadata.database.dao.Node;
Expand Down Expand Up @@ -76,6 +77,8 @@ public interface ControllerClient extends Closeable {

CompletableFuture<ConsumerGroup> describeGroup(String target, String groupName);

CompletableFuture<Void> updateGroup(String target, UpdateGroupRequest request);

CompletableFuture<Void> deleteGroup(String target, long groupId);

void listGroups(String target, ListGroupRequest request, StreamObserver<ListGroupReply> observer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import apache.rocketmq.controller.v1.Topic;
import apache.rocketmq.controller.v1.TrimStreamReply;
import apache.rocketmq.controller.v1.TrimStreamRequest;
import apache.rocketmq.controller.v1.UpdateGroupReply;
import apache.rocketmq.controller.v1.UpdateGroupRequest;
import apache.rocketmq.controller.v1.UpdateTopicReply;
import apache.rocketmq.controller.v1.UpdateTopicRequest;
import com.automq.rocketmq.common.config.GrpcClientConfig;
Expand Down Expand Up @@ -548,6 +550,34 @@ public void onFailure(@Nonnull Throwable t) {
return future;
}

@Override
public CompletableFuture<Void> updateGroup(String target, UpdateGroupRequest request) {
ControllerServiceGrpc.ControllerServiceFutureStub stub;
try {
stub = getOrCreateStubForTarget(target);
} catch (ControllerException e) {
return CompletableFuture.failedFuture(e);
}

CompletableFuture<Void> future = new CompletableFuture<>();
Futures.addCallback(stub.updateGroup(request), new FutureCallback<>() {
@Override
public void onSuccess(UpdateGroupReply result) {
if (result.getStatus().getCode() == Code.OK) {
future.complete(null);
} else {
future.completeExceptionally(new ControllerException(result.getStatus().getCodeValue(), result.getStatus().getMessage()));
}
}

@Override
public void onFailure(@Nonnull Throwable t) {
future.completeExceptionally(t);
}
}, MoreExecutors.directExecutor());
return future;
}

@Override
public CompletableFuture<Void> deleteGroup(String target, long groupId) {
ControllerServiceGrpc.ControllerServiceFutureStub stub;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import apache.rocketmq.controller.v1.StreamRole;
import apache.rocketmq.controller.v1.TerminationStage;
import apache.rocketmq.controller.v1.Topic;
import apache.rocketmq.controller.v1.UpdateGroupRequest;
import apache.rocketmq.controller.v1.UpdateTopicRequest;
import com.automq.rocketmq.common.api.DataStore;
import com.automq.rocketmq.common.config.ControllerConfig;
Expand Down Expand Up @@ -152,6 +153,8 @@ CompletableFuture<List<QueueAssignment>> listAssignments(Long topicId, Integer s

CompletableFuture<ConsumerGroup> describeGroup(Long groupId, String groupName);

CompletableFuture<Void> updateGroup(UpdateGroupRequest request);

/**
* Delete group with the given group id logically.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import apache.rocketmq.controller.v1.StreamRole;
import apache.rocketmq.controller.v1.StreamState;
import apache.rocketmq.controller.v1.TerminationStage;
import apache.rocketmq.controller.v1.UpdateGroupRequest;
import apache.rocketmq.controller.v1.UpdateTopicRequest;
import com.automq.rocketmq.common.PrefixThreadFactory;
import com.automq.rocketmq.common.api.DataStore;
Expand Down Expand Up @@ -448,6 +449,11 @@ public CompletableFuture<ConsumerGroup> describeGroup(Long groupId, String group
return groupManager.describeGroup(groupId, groupName);
}

@Override
public CompletableFuture<Void> updateGroup(UpdateGroupRequest request) {
return groupManager.updateGroup(request);
}

@Override
public CompletableFuture<ConsumerGroup> deleteGroup(long groupId) {
return groupManager.deleteGroup(groupId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import apache.rocketmq.controller.v1.GroupStatus;
import apache.rocketmq.controller.v1.GroupType;
import apache.rocketmq.controller.v1.TopicStatus;
import apache.rocketmq.controller.v1.UpdateGroupRequest;
import com.automq.rocketmq.controller.exception.ControllerException;
import com.automq.rocketmq.controller.metadata.MetadataStore;
import com.automq.rocketmq.controller.metadata.database.cache.GroupCache;
Expand Down Expand Up @@ -273,4 +274,56 @@ public CompletableFuture<Collection<ConsumerGroup>> listGroups() {
return groups;
});
}

public CompletableFuture<Void> updateGroup(UpdateGroupRequest request) {
return CompletableFuture.supplyAsync(() -> {
for (; ; ) {
if (metadataStore.isLeader()) {
try (SqlSession session = metadataStore.openSession()) {
if (!metadataStore.maintainLeadershipWithSharedLock(session)) {
continue;
}

GroupMapper mapper = session.getMapper(GroupMapper.class);
List<Group> groups = mapper.byCriteria(GroupCriteria.newBuilder()
.setGroupId(request.getGroupId())
.setStatus(GroupStatus.GROUP_STATUS_ACTIVE)
.build());
if (groups.isEmpty()) {
String msg = String.format("Group[group-id=%d] is not found", request.getGroupId());
throw new CompletionException(new ControllerException(Code.NOT_FOUND_VALUE, msg));
}

Group group = groups.get(0);
if (request.getGroupType() != GroupType.GROUP_TYPE_UNSPECIFIED) {
group.setGroupType(request.getGroupType());
}

if (!Strings.isNullOrEmpty(request.getName())) {
group.setName(request.getName());
}

if (request.getDeadLetterTopicId() > 0) {
group.setDeadLetterTopicId(request.getDeadLetterTopicId());
}

if (request.getMaxRetryAttempt() > 0) {
group.setMaxDeliveryAttempt(request.getMaxRetryAttempt());
}
mapper.update(group);
session.commit();
}
} else {
try {
metadataStore.controllerClient().updateGroup(metadataStore.leaderAddress(), request).join();
break;
} catch (ControllerException e) {
throw new CompletionException(e);
}
}
}

return null;
});
}
}
27 changes: 27 additions & 0 deletions proto/src/main/proto/controller.proto
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,31 @@ message DescribeGroupReply {
ConsumerGroup group = 2;
}

message UpdateGroupRequest {
ControllerRequestContext context = 1;

int64 groupId = 2;

// Consumer group name
string name = 3;

// Maximum retry attempts before forwarding messages to dead-letter-queue
int32 max_retry_attempt = 4;

// Type of the group, standard for max concurrency and FIFO for message-group scope ordering
GroupType group_type = 5;

// Associated topic for dead letters.
int64 dead_letter_topic_id = 6;

// How transactional messages are consumed.
IsolationLevel isolation = 7;
}

message UpdateGroupReply {
Status status = 1;
}

message DeleteGroupRequest {
ControllerRequestContext context = 1;
int64 id = 2;
Expand Down Expand Up @@ -459,6 +484,8 @@ service ControllerService {

rpc describeGroup(DescribeGroupRequest) returns (DescribeGroupReply) {}

rpc updateGroup(UpdateGroupRequest) returns (UpdateGroupReply) {}

rpc deleteGroup(DeleteGroupRequest) returns (DeleteGroupReply) {}

rpc listGroups(ListGroupRequest) returns (stream ListGroupReply) {}
Expand Down

0 comments on commit 089e6db

Please sign in to comment.