diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java index baaab3507b..d78ccf8763 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java @@ -22,7 +22,12 @@ import org.apache.fluss.client.metadata.KvSnapshots; import org.apache.fluss.client.metadata.LakeSnapshot; import org.apache.fluss.cluster.ServerNode; +import org.apache.fluss.cluster.rebalance.GoalType; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.exception.AuthorizationException; import org.apache.fluss.exception.DatabaseAlreadyExistException; import org.apache.fluss.exception.DatabaseNotEmptyException; import org.apache.fluss.exception.DatabaseNotExistException; @@ -32,10 +37,15 @@ import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.KvSnapshotNotExistException; import org.apache.fluss.exception.LakeTableSnapshotNotExistException; +import org.apache.fluss.exception.NoRebalanceInProgressException; import org.apache.fluss.exception.NonPrimaryKeyTableException; import org.apache.fluss.exception.PartitionAlreadyExistsException; import org.apache.fluss.exception.PartitionNotExistException; +import org.apache.fluss.exception.RebalanceFailureException; import org.apache.fluss.exception.SchemaNotExistException; +import org.apache.fluss.exception.ServerNotExistException; +import org.apache.fluss.exception.ServerTagAlreadyExistException; +import org.apache.fluss.exception.ServerTagNotExistException; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.exception.TableNotPartitionedException; @@ -56,6 +66,7 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -452,4 +463,90 @@ ListOffsetsResult listOffsets( * @return A CompletableFuture indicating completion of the operation. */ DropAclsResult dropAcls(Collection filters); + + /** + * Add server tag to the specified tabletServers, one tabletServer can only have one serverTag. + * + *

If one tabletServer failed adding tag, none of the tags will take effect. + * + *

+ * + * @param tabletServers the tabletServers we want to add server tags. + * @param serverTag the server tag to be added. + */ + CompletableFuture addServerTag(List tabletServers, ServerTag serverTag); + + /** + * Remove server tag from the specified tabletServers. + * + *

If one tabletServer failed removing tag, none of the tags will be removed. + * + *

    + *
  • {@link AuthorizationException} If the authenticated user doesn't have reset config + * access to the cluster. + *
  • {@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not + * exist. + *
  • {@link ServerTagNotExistException} If the server tag does not exist when {@code + * overWriteIfExists} is false. + *
+ * + * @param tabletServers the tabletServers we want to remove server tags. + */ + CompletableFuture removeServerTag(List tabletServers, ServerTag serverTag); + + /** + * Based on the provided {@code priorityGoals}, Fluss performs load balancing on the cluster's + * bucket load. + * + *

More details, Fluss collects the cluster's load information and optimizes to perform load + * balancing according to the user-defined {@code priorityGoals}. + * + *

Currently, Fluss only supports one active rebalance task in the cluster. If an uncompleted + * rebalance task exists, an {@link RebalanceFailureException} will be thrown. + * + *

    + *
  • {@link AuthorizationException} If the authenticated user doesn't have reset config + * access to the cluster. + *
  • {@link RebalanceFailureException} If the rebalance failed. Such as there is an ongoing + * execution. + *
+ * + * @param priorityGoals the goals to be optimized. + * @param dryRun Calculate and return the rebalance optimization proposal, but do not execute + * it. + * @return the generated rebalance plan for all the tableBuckets which need to do rebalance. + */ + CompletableFuture> rebalance( + List priorityGoals, boolean dryRun); + + /** + * List the rebalance process. + * + *
    + *
  • {@link AuthorizationException} If the authenticated user doesn't have reset config + * access to the cluster. + *
  • {@link NoRebalanceInProgressException} If there are no rebalance tasks in progress. + *
+ * + * @return the rebalance process for all the tableBuckets doing rebalance. + */ + CompletableFuture> listRebalanceProcess(); + + /** + * Cannel the rebalance task. + * + *
    + *
  • {@link AuthorizationException} If the authenticated user doesn't have reset config + * access to the cluster. + *
  • {@link NoRebalanceInProgressException} If there are no rebalance tasks in progress. + *
+ */ + CompletableFuture cancelRebalance(); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java index 0f9c007aa8..3d4ea48fef 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java @@ -22,8 +22,11 @@ import org.apache.fluss.client.metadata.LakeSnapshot; import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.client.utils.ClientRpcMessageUtils; -import org.apache.fluss.cluster.Cluster; import org.apache.fluss.cluster.ServerNode; +import org.apache.fluss.cluster.rebalance.GoalType; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.exception.LeaderNotAvailableException; import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metadata.DatabaseInfo; @@ -41,6 +44,7 @@ import org.apache.fluss.rpc.gateway.AdminGateway; import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; import org.apache.fluss.rpc.gateway.TabletServerGateway; +import org.apache.fluss.rpc.messages.AddServerTagRequest; import org.apache.fluss.rpc.messages.CreateAclsRequest; import org.apache.fluss.rpc.messages.CreateDatabaseRequest; import org.apache.fluss.rpc.messages.CreateTableRequest; @@ -65,6 +69,7 @@ import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket; import org.apache.fluss.rpc.messages.PbPartitionSpec; import org.apache.fluss.rpc.messages.PbTablePath; +import org.apache.fluss.rpc.messages.RemoveServerTagRequest; import org.apache.fluss.rpc.messages.TableExistsRequest; import org.apache.fluss.rpc.messages.TableExistsResponse; import org.apache.fluss.rpc.protocol.ApiError; @@ -86,7 +91,7 @@ import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest; import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest; import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makePbPartitionSpec; -import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster; +import static org.apache.fluss.client.utils.MetadataUtils.sendDescribeClusterRequest; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toPbAclBindingFilters; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toPbAclFilter; @@ -120,17 +125,8 @@ public CompletableFuture> getServerNodes() { CompletableFuture.runAsync( () -> { try { - List serverNodeList = new ArrayList<>(); - Cluster cluster = - sendMetadataRequestAndRebuildCluster( - readOnlyGateway, - false, - metadataUpdater.getCluster(), - null, - null, - null); - serverNodeList.add(cluster.getCoordinatorServer()); - serverNodeList.addAll(cluster.getAliveTabletServerList()); + List serverNodeList = + sendDescribeClusterRequest(readOnlyGateway); future.complete(serverNodeList); } catch (Throwable t) { future.completeExceptionally(t); @@ -464,6 +460,37 @@ public DropAclsResult dropAcls(Collection filters) { return result; } + @Override + public CompletableFuture addServerTag(List tabletServers, ServerTag serverTag) { + AddServerTagRequest request = new AddServerTagRequest().setServerTag(serverTag.value); + tabletServers.forEach(request::addServerId); + return gateway.addServerTag(request).thenApply(r -> null); + } + + @Override + public CompletableFuture removeServerTag( + List tabletServers, ServerTag serverTag) { + RemoveServerTagRequest request = new RemoveServerTagRequest().setServerTag(serverTag.value); + tabletServers.forEach(request::addServerId); + return gateway.removeServerTag(request).thenApply(r -> null); + } + + @Override + public CompletableFuture> rebalance( + List priorityGoals, boolean dryRun) { + throw new UnsupportedOperationException("Support soon"); + } + + @Override + public CompletableFuture> listRebalanceProcess() { + throw new UnsupportedOperationException("Support soon"); + } + + @Override + public CompletableFuture cancelRebalance() { + throw new UnsupportedOperationException("Support soon"); + } + @Override public void close() { // nothing to do yet diff --git a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java index 9ef857f178..a46e72bff2 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/utils/MetadataUtils.java @@ -21,6 +21,7 @@ import org.apache.fluss.cluster.Cluster; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.ServerType; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.StaleMetadataException; import org.apache.fluss.metadata.PhysicalTablePath; @@ -31,6 +32,8 @@ import org.apache.fluss.rpc.GatewayClientProxy; import org.apache.fluss.rpc.RpcClient; import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; +import org.apache.fluss.rpc.messages.DescribeClusterRequest; +import org.apache.fluss.rpc.messages.DescribeClusterResponse; import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.MetadataResponse; import org.apache.fluss.rpc.messages.PbBucketMetadata; @@ -165,6 +168,22 @@ public static Cluster sendMetadataRequestAndRebuildCluster( // time out here } + public static List sendDescribeClusterRequest(AdminReadOnlyGateway gateway) + throws ExecutionException, InterruptedException, TimeoutException { + DescribeClusterRequest describeClusterRequest = new DescribeClusterRequest(); + return gateway.describeCluster(describeClusterRequest) + .thenApply( + response -> { + List serverNodes = new ArrayList<>(); + serverNodes.add(getCoordinatorServer(response)); + serverNodes.addAll(getAliveTabletServers(response)); + return serverNodes; + }) + .get(30, TimeUnit.SECONDS); // TODO currently, we don't have timeout logic in + // RpcClient, it will let the get() block forever. So we + // time out here + } + private static NewTableMetadata getTableMetadataToUpdate( Cluster cluster, MetadataResponse metadataResponse) { Map newTablePathToTableId = new HashMap<>(); @@ -277,6 +296,19 @@ private static ServerNode getCoordinatorServer(MetadataResponse response) { } } + private static ServerNode getCoordinatorServer(DescribeClusterResponse response) { + if (!response.hasCoordinatorServer()) { + return null; + } else { + PbServerNode protoServerNode = response.getCoordinatorServer(); + return new ServerNode( + protoServerNode.getNodeId(), + protoServerNode.getHost(), + protoServerNode.getPort(), + ServerType.COORDINATOR); + } + } + private static Map getAliveTabletServers(MetadataResponse response) { Map aliveTabletServers = new HashMap<>(); response.getTabletServersList() @@ -295,6 +327,25 @@ private static Map getAliveTabletServers(MetadataResponse r return aliveTabletServers; } + public static List getAliveTabletServers(DescribeClusterResponse response) { + List aliveTabletServers = new ArrayList<>(); + response.getTabletServersList() + .forEach( + serverNode -> { + aliveTabletServers.add( + new ServerNode( + serverNode.getNodeId(), + serverNode.getHost(), + serverNode.getPort(), + ServerType.TABLET_SERVER, + serverNode.hasRack() ? serverNode.getRack() : null, + serverNode.hasServerTag() + ? ServerTag.valueOf(serverNode.getServerTag()) + : null)); + }); + return aliveTabletServers; + } + private static List toBucketLocations( TablePath tablePath, long tableId, diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index bf45d5974a..7951035157 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -24,6 +24,7 @@ import org.apache.fluss.client.table.Table; import org.apache.fluss.client.table.writer.UpsertWriter; import org.apache.fluss.cluster.ServerNode; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.AutoPartitionTimeUnit; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; @@ -39,6 +40,9 @@ import org.apache.fluss.exception.PartitionAlreadyExistsException; import org.apache.fluss.exception.PartitionNotExistException; import org.apache.fluss.exception.SchemaNotExistException; +import org.apache.fluss.exception.ServerNotExistException; +import org.apache.fluss.exception.ServerTagAlreadyExistException; +import org.apache.fluss.exception.ServerTagNotExistException; import org.apache.fluss.exception.TableNotExistException; import org.apache.fluss.exception.TableNotPartitionedException; import org.apache.fluss.exception.TooManyBucketsException; @@ -60,6 +64,7 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.kv.snapshot.KvSnapshotHandle; +import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.types.DataTypes; import org.junit.jupiter.api.BeforeEach; @@ -1050,4 +1055,64 @@ public void testSystemsColumns() throws Exception { + "Please use other names for these columns. " + "The reserved system columns are: __offset, __timestamp, __bucket"); } + + @Test + public void testAddAndRemoveServerTags() throws Exception { + ZooKeeperClient zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(); + // 1.add server tag to a none exists server. + assertThatThrownBy( + () -> + admin.addServerTag( + Collections.singletonList(100), + ServerTag.PERMANENT_OFFLINE) + .get()) + .cause() + .isInstanceOf(ServerNotExistException.class) + .hasMessageContaining("Server 100 not exists when trying to add server tag."); + + // 2.add server tag for server 0,1. + admin.addServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get(); + assertThat(admin.getServerNodes().get()) + .filteredOn(serverNode -> serverNode.serverTag() != null) + .extracting(ServerNode::id) + .containsExactlyInAnyOrder(0, 1); + + // 3.add server tag for server 0,2. error will be thrown and tag for 2 will not be added. + assertThatThrownBy( + () -> + admin.addServerTag(Arrays.asList(0, 2), ServerTag.PERMANENT_OFFLINE) + .get()) + .cause() + .isInstanceOf(ServerTagAlreadyExistException.class) + .hasMessageContaining("Server tag PERMANENT_OFFLINE already exists for server 0."); + + // 4.remove server tag for server 100 + assertThatThrownBy( + () -> + admin.removeServerTag( + Collections.singletonList(100), + ServerTag.PERMANENT_OFFLINE) + .get()) + .cause() + .isInstanceOf(ServerNotExistException.class) + .hasMessageContaining("Server 100 not exists when trying to removing server tag."); + + // 5.remove server tag for server 0,1. + admin.removeServerTag(Arrays.asList(0, 1), ServerTag.PERMANENT_OFFLINE).get(); + assertThat(admin.getServerNodes().get()) + .filteredOn(serverNode -> serverNode.serverTag() != null) + .extracting(ServerNode::id) + .isEmpty(); + + // 6.remove server tag for server 2. error will be thrown and tag for 2 will not be removed. + assertThatThrownBy( + () -> + admin.removeServerTag( + Collections.singletonList(0), + ServerTag.PERMANENT_OFFLINE) + .get()) + .cause() + .isInstanceOf(ServerTagNotExistException.class) + .hasMessageContaining("Server tag PERMANENT_OFFLINE not exists for server 0."); + } } diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/ServerNode.java b/fluss-common/src/main/java/org/apache/fluss/cluster/ServerNode.java index a41cce4016..643b89020b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/cluster/ServerNode.java +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/ServerNode.java @@ -18,6 +18,7 @@ package org.apache.fluss.cluster; import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.cluster.rebalance.ServerTag; import javax.annotation.Nullable; @@ -39,6 +40,9 @@ public class ServerNode { /** rack info for ServerNode. Currently, only tabletServer has rack info. */ private final @Nullable String rack; + /** Sever tag of tabletServer. */ + private final @Nullable ServerTag serverTag; + // Cache hashCode as it is called in performance sensitive parts of the code (e.g. // RecordAccumulator.ready) private Integer hash; @@ -48,11 +52,22 @@ public ServerNode(int id, String host, int port, ServerType serverType) { } public ServerNode(int id, String host, int port, ServerType serverType, @Nullable String rack) { + this(id, host, port, serverType, rack, null); + } + + public ServerNode( + int id, + String host, + int port, + ServerType serverType, + @Nullable String rack, + @Nullable ServerTag serverTag) { this.id = id; this.host = host; this.port = port; this.serverType = serverType; this.rack = rack; + this.serverTag = serverTag; if (serverType == ServerType.COORDINATOR) { this.uid = "cs-" + id; } else { @@ -96,6 +111,11 @@ public ServerType serverType() { return rack; } + /** The server tag for this node. */ + public @Nullable ServerTag serverTag() { + return serverTag; + } + /** * Check whether this node is empty, which may be the case if noNode() is used as a placeholder * in a response payload with an error. @@ -115,6 +135,7 @@ public int hashCode() { result = 31 * result + port; result = 31 * result + serverType.hashCode(); result = 31 * result + ((rack == null) ? 0 : rack.hashCode()); + result = 31 * result + ((serverTag == null) ? 0 : serverTag.hashCode()); this.hash = result; return result; } else { @@ -135,11 +156,12 @@ public boolean equals(Object obj) { && port == other.port && Objects.equals(host, other.host) && serverType == other.serverType - && Objects.equals(rack, other.rack); + && Objects.equals(rack, other.rack) + && Objects.equals(serverTag, other.serverTag); } @Override public String toString() { - return host + ":" + port + " (id: " + uid + ", rack: " + rack + ")"; + return host + ":" + port + " (id: " + uid + ", rack: " + rack + ", tag: " + serverTag + ")"; } } diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java new file mode 100644 index 0000000000..130b75cf6b --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/GoalType.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cluster.rebalance; + +import org.apache.fluss.annotation.PublicEvolving; + +import java.util.Arrays; + +/** + * The type of goal to optimize. + * + * @since 0.8 + */ +@PublicEvolving +public enum GoalType { + /** + * Goal to generate replica movement tasks to ensure that the number of replicas on each + * tabletServer is near balanced. + */ + REPLICA_DISTRIBUTION_GOAL(0), + + /** + * Goal to generate leadership movement and leader replica movement tasks to ensure that the + * number of leader replicas on each tabletServer is near balanced. + */ + LEADER_REPLICA_DISTRIBUTION_GOAL(1), + + /** Goal to move the leaders to the first replica of each tableBuckets. */ + PREFERRED_LEADER_GOAL(2); + + public final int value; + + GoalType(int value) { + this.value = value; + } + + public static GoalType valueOf(int value) { + if (value == REPLICA_DISTRIBUTION_GOAL.value) { + return REPLICA_DISTRIBUTION_GOAL; + } else if (value == LEADER_REPLICA_DISTRIBUTION_GOAL.value) { + return LEADER_REPLICA_DISTRIBUTION_GOAL; + } else if (value == PREFERRED_LEADER_GOAL.value) { + return PREFERRED_LEADER_GOAL; + } else { + throw new IllegalArgumentException( + String.format( + "Value %s must be one of %s", value, Arrays.asList(GoalType.values()))); + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java new file mode 100644 index 0000000000..bedce22663 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalancePlanForBucket.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cluster.rebalance; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.metadata.TableBucket; + +import java.util.List; +import java.util.Objects; + +/** + * a generated rebalance plan for a tableBucket. + * + * @since 0.8 + */ +@PublicEvolving +public class RebalancePlanForBucket { + private final TableBucket tableBucket; + private final int originalLeader; + private final int newLeader; + private final List originReplicas; + private final List newReplicas; + + public RebalancePlanForBucket( + TableBucket tableBucket, + int originalLeader, + int newLeader, + List originReplicas, + List newReplicas) { + this.tableBucket = tableBucket; + this.originalLeader = originalLeader; + this.newLeader = newLeader; + this.originReplicas = originReplicas; + this.newReplicas = newReplicas; + } + + public TableBucket getTableBucket() { + return tableBucket; + } + + public int getBucketId() { + return tableBucket.getBucket(); + } + + public Integer getOriginalLeader() { + return originalLeader; + } + + public Integer getNewLeader() { + return newLeader; + } + + public List getOriginReplicas() { + return originReplicas; + } + + public List getNewReplicas() { + return newReplicas; + } + + @Override + public String toString() { + return "RebalancePlanForBucket{" + + "tableBucket=" + + tableBucket + + ", originalLeader=" + + originalLeader + + ", newLeader=" + + newLeader + + ", originReplicas=" + + originReplicas + + ", newReplicas=" + + newReplicas + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RebalancePlanForBucket that = (RebalancePlanForBucket) o; + return Objects.equals(tableBucket, that.tableBucket) + && originalLeader == that.originalLeader + && newLeader == that.newLeader + && Objects.equals(originReplicas, that.originReplicas) + && Objects.equals(newReplicas, that.newReplicas); + } + + @Override + public int hashCode() { + return Objects.hash(tableBucket, originalLeader, newLeader, originReplicas, newReplicas); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java new file mode 100644 index 0000000000..c477e524b3 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceResultForBucket.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cluster.rebalance; + +import org.apache.fluss.annotation.PublicEvolving; +import org.apache.fluss.metadata.TableBucket; + +import java.util.List; + +/** + * Status of rebalance process for a tabletBucket. + * + * @since 0.8 + */ +@PublicEvolving +public class RebalanceResultForBucket { + private final RebalancePlanForBucket rebalancePlanForBucket; + private RebalanceStatusForBucket rebalanceStatusForBucket; + + public RebalanceResultForBucket( + RebalancePlanForBucket rebalancePlanForBucket, + RebalanceStatusForBucket rebalanceStatusForBucket) { + this.rebalancePlanForBucket = rebalancePlanForBucket; + this.rebalanceStatusForBucket = rebalanceStatusForBucket; + } + + public TableBucket tableBucket() { + return rebalancePlanForBucket.getTableBucket(); + } + + public RebalancePlanForBucket planForBucket() { + return rebalancePlanForBucket; + } + + public List newReplicas() { + return rebalancePlanForBucket.getNewReplicas(); + } + + public RebalanceResultForBucket setNewStatus(RebalanceStatusForBucket status) { + this.rebalanceStatusForBucket = status; + return this; + } + + public RebalanceStatusForBucket status() { + return rebalanceStatusForBucket; + } + + public static RebalanceResultForBucket of( + RebalancePlanForBucket planForBucket, RebalanceStatusForBucket status) { + return new RebalanceResultForBucket(planForBucket, status); + } + + @Override + public String toString() { + return "RebalanceResultForBucket{" + + "rebalancePlanForBucket=" + + rebalancePlanForBucket + + ", rebalanceStatusForBucket=" + + rebalanceStatusForBucket + + '}'; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java new file mode 100644 index 0000000000..e8c0e46733 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/RebalanceStatusForBucket.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cluster.rebalance; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Rebalance status for single bucket. + * + * @since 0.8 + */ +@PublicEvolving +public enum RebalanceStatusForBucket { + PENDING(1), + REBALANCING(2), + FAILED(3), + COMPLETED(4); + + private final int code; + + RebalanceStatusForBucket(int code) { + this.code = code; + } + + public static RebalanceStatusForBucket of(int code) { + for (RebalanceStatusForBucket status : RebalanceStatusForBucket.values()) { + if (status.code == code) { + return status; + } + } + return null; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java new file mode 100644 index 0000000000..5e20b34f73 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/cluster/rebalance/ServerTag.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.cluster.rebalance; + +import org.apache.fluss.annotation.PublicEvolving; + +import java.util.Arrays; + +/** + * The tag of tabletServer. + * + * @since 0.8 + */ +@PublicEvolving +public enum ServerTag { + /** + * The tabletServer is permanently offline. Such as the host where the tabletServer on is + * upcoming decommissioning. + */ + PERMANENT_OFFLINE(0), + + /** The tabletServer is temporarily offline. Such as the tabletServer is upcoming upgrading. */ + TEMPORARY_OFFLINE(1); + + public final int value; + + ServerTag(int value) { + this.value = value; + } + + public static ServerTag valueOf(int value) { + if (value == PERMANENT_OFFLINE.value) { + return PERMANENT_OFFLINE; + } else if (value == TEMPORARY_OFFLINE.value) { + return TEMPORARY_OFFLINE; + } else { + throw new IllegalArgumentException( + String.format( + "Value %s must be one of %s", + value, Arrays.asList(ServerTag.values()))); + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/NoRebalanceInProgressException.java b/fluss-common/src/main/java/org/apache/fluss/exception/NoRebalanceInProgressException.java new file mode 100644 index 0000000000..8b052a5100 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/NoRebalanceInProgressException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Thrown if there are no rebalance tasks in progress when list rebalance process. + * + * @since 0.8 + */ +@PublicEvolving +public class NoRebalanceInProgressException extends ApiException { + private static final long serialVersionUID = 1L; + + public NoRebalanceInProgressException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/RebalanceFailureException.java b/fluss-common/src/main/java/org/apache/fluss/exception/RebalanceFailureException.java new file mode 100644 index 0000000000..0dcf260b0c --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/RebalanceFailureException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.VisibleForTesting; + +/** + * This exception is thrown if rebalance failed. + * + * @since 0.8 + */ +@VisibleForTesting +public class RebalanceFailureException extends ApiException { + private static final long serialVersionUID = 1L; + + public RebalanceFailureException(String message) { + super(message); + } + + public RebalanceFailureException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ServerNotExistException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ServerNotExistException.java new file mode 100644 index 0000000000..2bdbe621e8 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ServerNotExistException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Thrown if a server does not exist in the cluster. + * + * @since 0.8 + */ +@PublicEvolving +public class ServerNotExistException extends ApiException { + private static final long serialVersionUID = 1L; + + public ServerNotExistException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagAlreadyExistException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagAlreadyExistException.java new file mode 100644 index 0000000000..a3d4259b13 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagAlreadyExistException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Thrown if a server tag already exists for specify tabletServer in the cluster. + * + * @since 0.8 + */ +@PublicEvolving +public class ServerTagAlreadyExistException extends ApiException { + private static final long serialVersionUID = 1L; + + public ServerTagAlreadyExistException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagNotExistException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagNotExistException.java new file mode 100644 index 0000000000..bd62672c72 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ServerTagNotExistException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Thrown if a server tag not exist for specify tabletServer in the cluster. + * + * @since 0.8 + */ +@PublicEvolving +public class ServerTagNotExistException extends ApiException { + + private static final long serialVersionUID = 1L; + + public ServerTagNotExistException(String message) { + super(message); + } +} diff --git a/fluss-common/src/test/java/org/apache/fluss/cluster/ServerNodeTest.java b/fluss-common/src/test/java/org/apache/fluss/cluster/ServerNodeTest.java index 92914b53ec..8c5e51f608 100644 --- a/fluss-common/src/test/java/org/apache/fluss/cluster/ServerNodeTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/cluster/ServerNodeTest.java @@ -45,7 +45,8 @@ void testServerNode() { assertThat(serverNode.hashCode()).isNotEqualTo(serverNode2.hashCode()); assertThat(serverNode).isEqualTo(new ServerNode(0, "HOST1", 9023, ServerType.COORDINATOR)); - assertThat(serverNode.toString()).isEqualTo("HOST1:9023 (id: cs-0, rack: null)"); - assertThat(serverNode2.toString()).isEqualTo("HOST2:9123 (id: ts-1, rack: null)"); + assertThat(serverNode.toString()).isEqualTo("HOST1:9023 (id: cs-0, rack: null, tag: null)"); + assertThat(serverNode2.toString()) + .isEqualTo("HOST2:9123 (id: ts-1, rack: null, tag: null)"); } } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java index b8cbd2512c..f2d80fc143 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java @@ -17,6 +17,10 @@ package org.apache.fluss.rpc.gateway; +import org.apache.fluss.rpc.messages.AddServerTagRequest; +import org.apache.fluss.rpc.messages.AddServerTagResponse; +import org.apache.fluss.rpc.messages.CancelRebalanceRequest; +import org.apache.fluss.rpc.messages.CancelRebalanceResponse; import org.apache.fluss.rpc.messages.CreateAclsRequest; import org.apache.fluss.rpc.messages.CreateAclsResponse; import org.apache.fluss.rpc.messages.CreateDatabaseRequest; @@ -33,6 +37,12 @@ import org.apache.fluss.rpc.messages.DropPartitionResponse; import org.apache.fluss.rpc.messages.DropTableRequest; import org.apache.fluss.rpc.messages.DropTableResponse; +import org.apache.fluss.rpc.messages.ListRebalanceProcessRequest; +import org.apache.fluss.rpc.messages.ListRebalanceProcessResponse; +import org.apache.fluss.rpc.messages.RebalanceRequest; +import org.apache.fluss.rpc.messages.RebalanceResponse; +import org.apache.fluss.rpc.messages.RemoveServerTagRequest; +import org.apache.fluss.rpc.messages.RemoveServerTagResponse; import org.apache.fluss.rpc.protocol.ApiKeys; import org.apache.fluss.rpc.protocol.RPC; @@ -104,6 +114,22 @@ public interface AdminGateway extends AdminReadOnlyGateway { @RPC(api = ApiKeys.DROP_ACLS) CompletableFuture dropAcls(DropAclsRequest request); + @RPC(api = ApiKeys.ADD_SERVER_TAG) + CompletableFuture addServerTag(AddServerTagRequest request); + + @RPC(api = ApiKeys.REMOVE_SERVER_TAG) + CompletableFuture removeServerTag(RemoveServerTagRequest request); + + @RPC(api = ApiKeys.REBALANCE) + CompletableFuture rebalance(RebalanceRequest request); + + @RPC(api = ApiKeys.LIST_REBALANCE_PROCESS) + CompletableFuture listRebalanceProcess( + ListRebalanceProcessRequest request); + + @RPC(api = ApiKeys.CANCEL_REBALANCE) + CompletableFuture cancelRebalance(CancelRebalanceRequest request); + // todo: rename table & alter table } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java index 958654caae..e73a14b5cc 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java @@ -20,6 +20,8 @@ import org.apache.fluss.rpc.RpcGateway; import org.apache.fluss.rpc.messages.DatabaseExistsRequest; import org.apache.fluss.rpc.messages.DatabaseExistsResponse; +import org.apache.fluss.rpc.messages.DescribeClusterRequest; +import org.apache.fluss.rpc.messages.DescribeClusterResponse; import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest; import org.apache.fluss.rpc.messages.GetDatabaseInfoResponse; import org.apache.fluss.rpc.messages.GetFileSystemSecurityTokenRequest; @@ -187,4 +189,13 @@ CompletableFuture getLatestLakeSnapshot( */ @RPC(api = ApiKeys.LIST_ACLS) CompletableFuture listAcls(ListAclsRequest request); + + /** + * Get the server node information of the current cluster. + * + * @param request Describe cluster request + * @return a future returns server node + */ + @RPC(api = ApiKeys.DESCRIBE_CLUSTER) + CompletableFuture describeCluster(DescribeClusterRequest request); } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java index 0bc2d494a8..b232ce4e6f 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java @@ -70,7 +70,13 @@ public enum ApiKeys { CREATE_ACLS(1039, 0, 0, PUBLIC), LIST_ACLS(1040, 0, 0, PUBLIC), DROP_ACLS(1041, 0, 0, PUBLIC), - LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE); + LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE), + ADD_SERVER_TAG(1043, 0, 0, PUBLIC), + REMOVE_SERVER_TAG(1044, 0, 0, PUBLIC), + REBALANCE(1045, 0, 0, PUBLIC), + LIST_REBALANCE_PROCESS(1046, 0, 0, PUBLIC), + CANCEL_REBALANCE(1047, 0, 0, PUBLIC), + DESCRIBE_CLUSTER(1048, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java index 5edb3ac833..0bb6715937 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java @@ -48,6 +48,7 @@ import org.apache.fluss.exception.LogOffsetOutOfRangeException; import org.apache.fluss.exception.LogStorageException; import org.apache.fluss.exception.NetworkException; +import org.apache.fluss.exception.NoRebalanceInProgressException; import org.apache.fluss.exception.NonPrimaryKeyTableException; import org.apache.fluss.exception.NotEnoughReplicasAfterAppendException; import org.apache.fluss.exception.NotEnoughReplicasException; @@ -56,11 +57,15 @@ import org.apache.fluss.exception.OutOfOrderSequenceException; import org.apache.fluss.exception.PartitionAlreadyExistsException; import org.apache.fluss.exception.PartitionNotExistException; +import org.apache.fluss.exception.RebalanceFailureException; import org.apache.fluss.exception.RecordTooLargeException; import org.apache.fluss.exception.RetriableAuthenticationException; import org.apache.fluss.exception.SchemaNotExistException; import org.apache.fluss.exception.SecurityDisabledException; import org.apache.fluss.exception.SecurityTokenException; +import org.apache.fluss.exception.ServerNotExistException; +import org.apache.fluss.exception.ServerTagAlreadyExistException; +import org.apache.fluss.exception.ServerTagNotExistException; import org.apache.fluss.exception.StorageException; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotExistException; @@ -214,7 +219,14 @@ public enum Errors { INVALID_SERVER_RACK_INFO_EXCEPTION( 52, "The server rack info is invalid.", InvalidServerRackInfoException::new), LAKE_SNAPSHOT_NOT_EXIST( - 53, "The lake snapshot is not exist.", LakeTableSnapshotNotExistException::new); + 53, "The lake snapshot is not exist.", LakeTableSnapshotNotExistException::new), + SERVER_NOT_EXIST_EXCEPTION(54, "The server is not exist.", ServerNotExistException::new), + SEVER_TAG_ALREADY_EXIST_EXCEPTION( + 55, "The server tag already exist.", ServerTagAlreadyExistException::new), + SEVER_TAG_NOT_EXIST_EXCEPTION(56, "The server tag not exist.", ServerTagNotExistException::new), + REBALANCE_FAILURE_EXCEPTION(57, "The rebalance task failure.", RebalanceFailureException::new), + NO_REBALANCE_IN_PROGRESS_EXCEPTION( + 58, "No rebalance task in progress.", NoRebalanceInProgressException::new); private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 2ddaa44e8a..be2a2ecefb 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -178,6 +178,15 @@ message UpdateMetadataRequest { message UpdateMetadataResponse { } +// describe cluster request and response +message DescribeClusterRequest { +} + +message DescribeClusterResponse { + optional PbServerNode coordinator_server = 1; + repeated PbServerNode tablet_servers = 2; +} + // produce log request and response message ProduceLogRequest { required int32 acks = 1; @@ -530,6 +539,43 @@ message LakeTieringHeartbeatResponse { repeated PbHeartbeatRespForTable failed_table_resp = 5; } +message AddServerTagRequest { + repeated int32 server_ids = 1 [packed = true]; + required int32 server_tag = 2; +} + +message AddServerTagResponse { +} + +message RemoveServerTagRequest { + repeated int32 server_ids = 1 [packed = true]; + required int32 server_tag = 2; +} + +message RemoveServerTagResponse { +} + +message RebalanceRequest { + repeated int32 goals = 1 [packed = true]; + required bool dry_run = 2; +} + +message RebalanceResponse { + repeated PbRebalancePlanForTable plan_for_table = 1; +} + +message ListRebalanceProcessRequest { +} + +message ListRebalanceProcessResponse { + repeated PbRebalanceProcessForTable process_for_table = 1; +} + +message CancelRebalanceRequest { +} + +message CancelRebalanceResponse { +} // --------------- Inner classes ---------------- message PbApiVersion { @@ -554,12 +600,14 @@ message PbPhysicalTablePath { // * versions <= 0.6: host and port are used. // * versions >= 0.7: listeners is used to replace host and port. // For MetadataResponse and UpdateMetadataRequest: Fluss versions >= 0.7: we add rack for each tabletServer +// For DescribeClusterResponse: Fluss version >= 0.8: we add server_tag for each tabletServer message PbServerNode { required int32 node_id = 1; required string host = 2; required int32 port = 3; optional string listeners = 4; optional string rack = 5; + optional int32 server_tag = 6; } message PbTableMetadata { @@ -867,4 +915,42 @@ message PbHeartbeatReqForTable { message PbHeartbeatRespForTable { required int64 table_id = 1; optional ErrorResponse error = 2; +} + +message PbRebalancePlanForTable { + required int64 table_id = 1; + repeated PbRebalancePlanForPartition partitions_plan = 2; // for none-partition table, this is empty + repeated PbRebalancePlanForBucket buckets_plan = 3; // for partition table, this is empty + +} + +message PbRebalancePlanForPartition { + required int64 partition_id = 1; + repeated PbRebalancePlanForBucket buckets_plan = 2; +} + +message PbRebalancePlanForBucket { + required int32 bucket_id = 1; + optional int32 original_leader = 2; + optional int32 new_leader = 3; + repeated int32 original_replicas = 4 [packed = true]; + repeated int32 new_replicas = 5 [packed = true]; +} + +message PbRebalanceProcessForTable { + required int64 table_id = 1; + repeated PbRebalanceProcessForPartition partitions_process = 2; + repeated PbRebalanceProcessForBucket buckets_process = 3; +} + +message PbRebalanceProcessForPartition { + required int64 partition_id = 1; + repeated PbRebalanceProcessForBucket buckets_process = 2; +} + +message PbRebalanceProcessForBucket { + required int32 bucket_id = 1; + repeated int32 original_replicas = 2 [packed = true]; + repeated int32 new_replicas = 3 [packed = true]; + required int32 rebalance_status = 4; } \ No newline at end of file diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java index 2a60bef37d..8646a1e54c 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java @@ -21,6 +21,8 @@ import org.apache.fluss.rpc.gateway.TabletServerGateway; import org.apache.fluss.rpc.messages.DatabaseExistsRequest; import org.apache.fluss.rpc.messages.DatabaseExistsResponse; +import org.apache.fluss.rpc.messages.DescribeClusterRequest; +import org.apache.fluss.rpc.messages.DescribeClusterResponse; import org.apache.fluss.rpc.messages.FetchLogRequest; import org.apache.fluss.rpc.messages.FetchLogResponse; import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest; @@ -241,4 +243,10 @@ public CompletableFuture getLatestLakeSnapshot( public CompletableFuture listAcls(ListAclsRequest request) { return null; } + + @Override + public CompletableFuture describeCluster( + DescribeClusterRequest request) { + return null; + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index bbf135bf4b..6f502216b8 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -17,7 +17,9 @@ package org.apache.fluss.server; +import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.ServerType; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.KvSnapshotNotExistException; import org.apache.fluss.exception.LakeTableSnapshotNotExistException; @@ -42,6 +44,8 @@ import org.apache.fluss.rpc.messages.ApiVersionsResponse; import org.apache.fluss.rpc.messages.DatabaseExistsRequest; import org.apache.fluss.rpc.messages.DatabaseExistsResponse; +import org.apache.fluss.rpc.messages.DescribeClusterRequest; +import org.apache.fluss.rpc.messages.DescribeClusterResponse; import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest; import org.apache.fluss.rpc.messages.GetDatabaseInfoResponse; import org.apache.fluss.rpc.messages.GetFileSystemSecurityTokenRequest; @@ -92,6 +96,7 @@ import org.apache.fluss.server.zk.data.BucketSnapshot; import org.apache.fluss.server.zk.data.LakeTableSnapshot; import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.ServerTags; import org.apache.fluss.server.zk.data.TableAssignment; import org.slf4j.Logger; @@ -113,6 +118,7 @@ import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclFilter; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toResolvedPartitionSpec; import static org.apache.fluss.security.acl.Resource.TABLE_SPLITTER; +import static org.apache.fluss.server.utils.ServerRpcMessageUtils.buildDescribeClusterResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.buildMetadataResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeGetLatestKvSnapshotsResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeGetLatestLakeSnapshotResponse; @@ -466,6 +472,15 @@ public CompletableFuture listAcls(ListAclsRequest request) { } } + @Override + public CompletableFuture describeCluster( + DescribeClusterRequest request) { + return CompletableFuture.completedFuture( + makeDescribeClusterResponse(currentListenerName(), getServerMetadataCache())); + } + + protected abstract ServerMetadataCache getServerMetadataCache(); + protected MetadataResponse makeMetadataResponse( MetadataRequest request, String listenerName, @@ -533,6 +548,42 @@ protected MetadataResponse makeMetadataResponse( partitionMetadata); } + protected DescribeClusterResponse makeDescribeClusterResponse( + String listenerName, ServerMetadataCache metadataCache) { + ServerNode coordinatorServer = metadataCache.getCoordinatorServer(listenerName); + + Collection aliveTabletServers = + metadataCache.getAllAliveTabletServers(listenerName).values(); + Set aliveTabletServersWithTag = new HashSet<>(aliveTabletServers.size()); + try { + Optional serverTagsOp = zkClient.getServerTags(); + if (serverTagsOp.isPresent()) { + Map tagMap = serverTagsOp.get().getServerTags(); + for (ServerNode server : aliveTabletServers) { + if (tagMap.containsKey(server.id())) { + ServerNode serverWithTag = + new ServerNode( + server.id(), + server.host(), + server.port(), + server.serverType(), + server.rack(), + tagMap.get(server.id())); + aliveTabletServersWithTag.add(serverWithTag); + } else { + aliveTabletServersWithTag.add(server); + } + } + } else { + aliveTabletServersWithTag.addAll(aliveTabletServers); + } + } catch (Exception e) { + throw new FlussRuntimeException("Failed to get server tags", e); + } + + return buildDescribeClusterResponse(coordinatorServer, aliveTabletServersWithTag); + } + public static List getTableMetadataFromZk( ZooKeeperClient zkClient, TablePath tablePath, long tableId, boolean isPartitioned) { try { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java index aa372f56a4..4a2df6001e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java @@ -18,6 +18,7 @@ package org.apache.fluss.server.coordinator; import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableBucketReplica; @@ -101,6 +102,9 @@ public class CoordinatorContext { */ private final Map> replicasOnOffline = new HashMap<>(); + /** A mapping from tabletServers to server tag. */ + private final Map serverTags = new HashMap<>(); + private ServerInfo coordinatorServerInfo = null; private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH; @@ -616,6 +620,26 @@ public void removePartition(TablePartition tablePartition) { } } + public void initSeverTags(Map initialServerTags) { + serverTags.putAll(initialServerTags); + } + + public void putServerTag(int serverId, ServerTag serverTag) { + serverTags.put(serverId, serverTag); + } + + public Map getServerTags() { + return new HashMap<>(serverTags); + } + + public Optional getServerTag(int serverId) { + return Optional.ofNullable(serverTags.get(serverId)); + } + + public void removeServerTag(int serverId) { + serverTags.remove(serverId); + } + private void clearTablesState() { tableAssignments.clear(); partitionAssignments.clear(); @@ -636,5 +660,6 @@ public void resetContext() { clearTablesState(); // clear the live tablet servers liveTabletServers.clear(); + serverTags.clear(); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index ffc6192b72..a5bec1f4ba 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -21,12 +21,17 @@ import org.apache.fluss.cluster.Endpoint; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.ServerType; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.FencedLeaderEpochException; import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.InvalidCoordinatorException; import org.apache.fluss.exception.InvalidUpdateVersionException; +import org.apache.fluss.exception.ServerNotExistException; +import org.apache.fluss.exception.ServerTagAlreadyExistException; +import org.apache.fluss.exception.ServerTagNotExistException; +import org.apache.fluss.exception.UnknownServerException; import org.apache.fluss.exception.UnknownTableOrBucketException; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; @@ -34,13 +39,16 @@ import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePartition; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.rpc.messages.AddServerTagResponse; import org.apache.fluss.rpc.messages.AdjustIsrResponse; import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse; import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse; import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse; import org.apache.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable; +import org.apache.fluss.rpc.messages.RemoveServerTagResponse; import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.server.coordinator.event.AccessContextEvent; +import org.apache.fluss.server.coordinator.event.AddServerTagEvent; import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent; import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent; import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent; @@ -57,6 +65,7 @@ import org.apache.fluss.server.coordinator.event.FencedCoordinatorEvent; import org.apache.fluss.server.coordinator.event.NewTabletServerEvent; import org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent; +import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent; import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher; import org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher; import org.apache.fluss.server.coordinator.statemachine.ReplicaStateMachine; @@ -77,6 +86,7 @@ import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.RemoteLogManifestHandle; +import org.apache.fluss.server.zk.data.ServerTags; import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.server.zk.data.TabletServerRegistration; import org.apache.fluss.server.zk.data.ZkData.PartitionIdsZNode; @@ -298,6 +308,11 @@ private void initCoordinatorContext() throws Exception { // init tablet server channels coordinatorChannelManager.startup(internalServerNodes); + // load server tags. + zooKeeperClient + .getServerTags() + .ifPresent(tags -> coordinatorContext.initSeverTags(tags.getServerTags())); + // load all tables List autoPartitionTables = new ArrayList<>(); List> lakeTables = new ArrayList<>(); @@ -470,6 +485,16 @@ public void process(CoordinatorEvent event) { completeFromCallable( commitLakeTableSnapshotEvent.getRespCallback(), () -> tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent)); + } else if (event instanceof AddServerTagEvent) { + AddServerTagEvent addServerTagEvent = (AddServerTagEvent) event; + completeFromCallable( + addServerTagEvent.getRespCallback(), + () -> processAddServerTag(addServerTagEvent)); + } else if (event instanceof RemoveServerTagEvent) { + RemoveServerTagEvent removeServerTagEvent = (RemoveServerTagEvent) event; + completeFromCallable( + removeServerTagEvent.getRespCallback(), + () -> processRemoveServerTag(removeServerTagEvent)); } else if (event instanceof AccessContextEvent) { AccessContextEvent accessContextEvent = (AccessContextEvent) event; processAccessContext(accessContextEvent); @@ -827,6 +852,90 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent updateTabletServerMetadataCache(serverInfos, null, null, bucketsWithOfflineLeader); } + private AddServerTagResponse processAddServerTag(AddServerTagEvent event) { + AddServerTagResponse addServerTagResponse = new AddServerTagResponse(); + List serverIds = event.getServerIds(); + ServerTag serverTag = event.getServerTag(); + + // Verify that dose serverTag exist for input serverIds. If any of them exists, throw + // an error and none of them will be written to coordinatorContext and zk. + Map liveTabletServers = coordinatorContext.getLiveTabletServers(); + for (Integer serverId : serverIds) { + if (!liveTabletServers.containsKey(serverId)) { + throw new ServerNotExistException( + String.format( + "Server %s not exists when trying to add server tag.", serverId)); + } + + if (coordinatorContext.getServerTag(serverId).isPresent()) { + throw new ServerTagAlreadyExistException( + String.format( + "Server tag %s already exists for server %s.", + serverTag, serverId)); + } + } + + // First register to zk, and then update coordinatorContext. + Map serverTags = coordinatorContext.getServerTags(); + for (Integer serverId : serverIds) { + serverTags.put(serverId, serverTag); + } + + try { + zooKeeperClient.registerServerTags(new ServerTags(serverTags)); + } catch (Exception e) { + LOG.error("Error when register server tags to zookeeper.", e); + throw new UnknownServerException("Error when register server tags to zookeeper.", e); + } + + // Then update coordinatorContext. + serverIds.forEach(serverId -> coordinatorContext.putServerTag(serverId, serverTag)); + + return addServerTagResponse; + } + + private RemoveServerTagResponse processRemoveServerTag(RemoveServerTagEvent event) { + RemoveServerTagResponse removeServerTagResponse = new RemoveServerTagResponse(); + List serverIds = event.getServerIds(); + ServerTag serverTag = event.getServerTag(); + + // Verify that dose serverTag not exist for input serverIds. If any of them not exists, + // throw an error and none of them will be removed form coordinatorContext and zk. + Map liveTabletServers = coordinatorContext.getLiveTabletServers(); + for (Integer serverId : serverIds) { + if (!liveTabletServers.containsKey(serverId)) { + throw new ServerNotExistException( + String.format( + "Server %s not exists when trying to removing server tag.", + serverId)); + } + + if (!coordinatorContext.getServerTag(serverId).isPresent()) { + throw new ServerTagNotExistException( + String.format( + "Server tag %s not exists for server %s.", serverTag, serverId)); + } + } + + // First register to zk, and then update coordinatorContext. + Map serverTags = coordinatorContext.getServerTags(); + for (Integer serverId : serverIds) { + serverTags.remove(serverId); + } + + try { + zooKeeperClient.registerServerTags(new ServerTags(serverTags)); + } catch (Exception e) { + LOG.error("Error when register server tags to zookeeper.", e); + throw new UnknownServerException("Error when register server tags to zookeeper.", e); + } + + // Then update coordinatorContext. + serverIds.forEach(coordinatorContext::removeServerTag); + + return removeServerTagResponse; + } + private List tryProcessAdjustIsr( Map leaderAndIsrList) { // TODO verify leader epoch. diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 7253ef9626..c09cb6c410 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -19,6 +19,7 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.cluster.TabletServerInfo; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.InvalidCoordinatorException; @@ -40,8 +41,12 @@ import org.apache.fluss.metadata.TablePartition; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.messages.AddServerTagRequest; +import org.apache.fluss.rpc.messages.AddServerTagResponse; import org.apache.fluss.rpc.messages.AdjustIsrRequest; import org.apache.fluss.rpc.messages.AdjustIsrResponse; +import org.apache.fluss.rpc.messages.CancelRebalanceRequest; +import org.apache.fluss.rpc.messages.CancelRebalanceResponse; import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest; import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse; import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest; @@ -66,10 +71,16 @@ import org.apache.fluss.rpc.messages.DropTableResponse; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse; +import org.apache.fluss.rpc.messages.ListRebalanceProcessRequest; +import org.apache.fluss.rpc.messages.ListRebalanceProcessResponse; import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.MetadataResponse; import org.apache.fluss.rpc.messages.PbHeartbeatReqForTable; import org.apache.fluss.rpc.messages.PbHeartbeatRespForTable; +import org.apache.fluss.rpc.messages.RebalanceRequest; +import org.apache.fluss.rpc.messages.RebalanceResponse; +import org.apache.fluss.rpc.messages.RemoveServerTagRequest; +import org.apache.fluss.rpc.messages.RemoveServerTagResponse; import org.apache.fluss.rpc.netty.server.Session; import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.security.acl.AclBinding; @@ -81,11 +92,13 @@ import org.apache.fluss.server.authorizer.AclDeleteResult; import org.apache.fluss.server.authorizer.Authorizer; import org.apache.fluss.server.coordinator.event.AccessContextEvent; +import org.apache.fluss.server.coordinator.event.AddServerTagEvent; import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent; import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent; import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent; import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent; import org.apache.fluss.server.coordinator.event.EventManager; +import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent; import org.apache.fluss.server.entity.CommitKvSnapshotData; import org.apache.fluss.server.entity.LakeTieringTableInfo; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; @@ -107,12 +120,14 @@ import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings; @@ -567,6 +582,54 @@ public CompletableFuture lakeTieringHeartbeat( return CompletableFuture.completedFuture(heartbeatResponse); } + @Override + public CompletableFuture addServerTag(AddServerTagRequest request) { + CompletableFuture response = new CompletableFuture<>(); + eventManagerSupplier + .get() + .put( + new AddServerTagEvent( + Arrays.stream(request.getServerIds()) + .boxed() + .collect(Collectors.toList()), + ServerTag.valueOf(request.getServerTag()), + response)); + return response; + } + + @Override + public CompletableFuture removeServerTag( + RemoveServerTagRequest request) { + CompletableFuture response = new CompletableFuture<>(); + eventManagerSupplier + .get() + .put( + new RemoveServerTagEvent( + Arrays.stream(request.getServerIds()) + .boxed() + .collect(Collectors.toList()), + ServerTag.valueOf(request.getServerTag()), + response)); + return response; + } + + @Override + public CompletableFuture rebalance(RebalanceRequest request) { + throw new UnsupportedOperationException("Support soon!"); + } + + @Override + public CompletableFuture listRebalanceProcess( + ListRebalanceProcessRequest request) { + throw new UnsupportedOperationException("Support soon!"); + } + + @Override + public CompletableFuture cancelRebalance( + CancelRebalanceRequest request) { + throw new UnsupportedOperationException("Support soon!"); + } + private void validateHeartbeatRequest( PbHeartbeatReqForTable heartbeatReqForTable, int currentEpoch) { if (heartbeatReqForTable.getCoordinatorEpoch() != currentEpoch) { @@ -650,4 +713,9 @@ private static List getBucketMetadataFromContext( }); return bucketMetadataList; } + + @Override + protected ServerMetadataCache getServerMetadataCache() { + return metadataCache; + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/AddServerTagEvent.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/AddServerTagEvent.java new file mode 100644 index 0000000000..b6e7af8886 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/AddServerTagEvent.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event; + +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.rpc.messages.AddServerTagResponse; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** An event for add server tag. */ +public class AddServerTagEvent implements CoordinatorEvent { + private final List serverIds; + private final ServerTag serverTag; + private final CompletableFuture respCallback; + + public AddServerTagEvent( + List serverIds, + ServerTag serverTag, + CompletableFuture respCallback) { + this.serverIds = serverIds; + this.serverTag = serverTag; + this.respCallback = respCallback; + } + + public List getServerIds() { + return serverIds; + } + + public ServerTag getServerTag() { + return serverTag; + } + + public CompletableFuture getRespCallback() { + return respCallback; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RemoveServerTagEvent.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RemoveServerTagEvent.java new file mode 100644 index 0000000000..ede6fdeb0c --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/RemoveServerTagEvent.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.event; + +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.rpc.messages.RemoveServerTagResponse; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** An event for remove server tag. */ +public class RemoveServerTagEvent implements CoordinatorEvent { + private final List serverIds; + private final ServerTag serverTag; + private final CompletableFuture respCallback; + + public RemoveServerTagEvent( + List serverIds, + ServerTag serverTag, + CompletableFuture respCallback) { + this.serverIds = serverIds; + this.serverTag = serverTag; + this.respCallback = respCallback; + } + + public List getServerIds() { + return serverIds; + } + + public ServerTag getServerTag() { + return serverTag; + } + + public CompletableFuture getRespCallback() { + return respCallback; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index 3bf13aa02f..ebdbc9f969 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -71,6 +71,7 @@ import org.apache.fluss.server.entity.NotifyLeaderAndIsrData; import org.apache.fluss.server.log.FetchParams; import org.apache.fluss.server.log.ListOffsetsParam; +import org.apache.fluss.server.metadata.ServerMetadataCache; import org.apache.fluss.server.metadata.TabletServerMetadataCache; import org.apache.fluss.server.replica.ReplicaManager; import org.apache.fluss.server.utils.ServerRpcMessageUtils; @@ -480,4 +481,9 @@ private Set filterAuthorizedTables( }) .collect(Collectors.toSet()); } + + @Override + protected ServerMetadataCache getServerMetadataCache() { + return metadataCache; + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index f77cf378ed..a3e25df0ca 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -53,6 +53,7 @@ import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest; import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest; import org.apache.fluss.rpc.messages.CreateAclsResponse; +import org.apache.fluss.rpc.messages.DescribeClusterResponse; import org.apache.fluss.rpc.messages.DropAclsResponse; import org.apache.fluss.rpc.messages.FetchLogRequest; import org.apache.fluss.rpc.messages.FetchLogResponse; @@ -270,6 +271,38 @@ public static MetadataResponse buildMetadataResponse( return metadataResponse; } + public static DescribeClusterResponse buildDescribeClusterResponse( + @Nullable ServerNode coordinatorServer, Set aliveTabletServers) { + DescribeClusterResponse describeClusterResponse = new DescribeClusterResponse(); + + if (coordinatorServer != null) { + describeClusterResponse + .setCoordinatorServer() + .setNodeId(coordinatorServer.id()) + .setHost(coordinatorServer.host()) + .setPort(coordinatorServer.port()); + } + + List pbServerNodeList = new ArrayList<>(); + for (ServerNode serverNode : aliveTabletServers) { + PbServerNode pbServerNode = + new PbServerNode() + .setNodeId(serverNode.id()) + .setHost(serverNode.host()) + .setPort(serverNode.port()); + if (serverNode.rack() != null) { + pbServerNode.setRack(serverNode.rack()); + } + if (serverNode.serverTag() != null) { + pbServerNode.setServerTag(serverNode.serverTag().value); + } + pbServerNodeList.add(pbServerNode); + } + + describeClusterResponse.addAllTabletServers(pbServerNodeList); + return describeClusterResponse; + } + public static UpdateMetadataRequest makeUpdateMetadataRequest( @Nullable ServerInfo coordinatorServer, Set aliveTableServers, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index abdc5e1fdd..2542468628 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -35,8 +35,10 @@ import org.apache.fluss.server.zk.data.LakeTableSnapshot; import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.server.zk.data.PartitionAssignment; +import org.apache.fluss.server.zk.data.RebalancePlan; import org.apache.fluss.server.zk.data.RemoteLogManifestHandle; import org.apache.fluss.server.zk.data.ResourceAcl; +import org.apache.fluss.server.zk.data.ServerTags; import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.server.zk.data.TableRegistration; import org.apache.fluss.server.zk.data.TabletServerRegistration; @@ -55,11 +57,13 @@ import org.apache.fluss.server.zk.data.ZkData.PartitionSequenceIdZNode; import org.apache.fluss.server.zk.data.ZkData.PartitionZNode; import org.apache.fluss.server.zk.data.ZkData.PartitionsZNode; +import org.apache.fluss.server.zk.data.ZkData.RebalanceZNode; import org.apache.fluss.server.zk.data.ZkData.ResourceAclNode; import org.apache.fluss.server.zk.data.ZkData.SchemaZNode; import org.apache.fluss.server.zk.data.ZkData.SchemasZNode; import org.apache.fluss.server.zk.data.ZkData.ServerIdZNode; import org.apache.fluss.server.zk.data.ZkData.ServerIdsZNode; +import org.apache.fluss.server.zk.data.ZkData.ServerTagsZNode; import org.apache.fluss.server.zk.data.ZkData.TableIdZNode; import org.apache.fluss.server.zk.data.ZkData.TableSequenceIdZNode; import org.apache.fluss.server.zk.data.ZkData.TableZNode; @@ -902,6 +906,49 @@ public void insertAclChangeNotification(Resource resource) throws Exception { LOG.info("add acl change notification for resource {} ", resource); } + // -------------------------------------------------------------------------------------------- + // Maintenance + // -------------------------------------------------------------------------------------------- + + public void registerServerTags(ServerTags newServerTags) throws Exception { + String path = ServerTagsZNode.path(); + if (getOrEmpty(path).isPresent()) { + zkClient.setData().forPath(path, ServerTagsZNode.encode(newServerTags)); + } else { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path, ServerTagsZNode.encode(newServerTags)); + } + } + + public Optional getServerTags() throws Exception { + String path = ServerTagsZNode.path(); + return getOrEmpty(path).map(ServerTagsZNode::decode); + } + + public void registerRebalancePlan(RebalancePlan rebalancePlan) throws Exception { + String path = RebalanceZNode.path(); + if (getOrEmpty(path).isPresent()) { + zkClient.setData().forPath(path, RebalanceZNode.encode(rebalancePlan)); + } else { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path, RebalanceZNode.encode(rebalancePlan)); + } + } + + public Optional getRebalancePlan() throws Exception { + String path = RebalanceZNode.path(); + return getOrEmpty(path).map(RebalanceZNode::decode); + } + + public void deleteRebalancePlan() throws Exception { + String path = RebalanceZNode.path(); + deletePath(path); + } + // -------------------------------------------------------------------------------------------- // Utils // -------------------------------------------------------------------------------------------- diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java new file mode 100644 index 0000000000..00c87bcbc3 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlan.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePartition; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * The generated rebalance plan for this cluster. + * + *

The latest execution rebalance plan will be stored in {@link ZkData.RebalanceZNode}. + * + * @see RebalancePlanJsonSerde for json serialization and deserialization. + */ +public class RebalancePlan { + + /** A mapping from tableBucket to RebalancePlanForBuckets of none-partitioned table. */ + private final Map> planForBuckets; + + /** A mapping from tableBucket to RebalancePlanForBuckets of partitioned table. */ + private final Map> + planForBucketsOfPartitionedTable; + + public RebalancePlan(Map bucketPlan) { + this.planForBuckets = new HashMap<>(); + this.planForBucketsOfPartitionedTable = new HashMap<>(); + + for (Map.Entry entry : bucketPlan.entrySet()) { + TableBucket tableBucket = entry.getKey(); + RebalancePlanForBucket rebalancePlanForBucket = entry.getValue(); + if (tableBucket.getPartitionId() == null) { + planForBuckets + .computeIfAbsent(tableBucket.getTableId(), k -> new ArrayList<>()) + .add(rebalancePlanForBucket); + } else { + TablePartition tp = + new TablePartition(tableBucket.getTableId(), tableBucket.getPartitionId()); + planForBucketsOfPartitionedTable + .computeIfAbsent(tp, k -> new ArrayList<>()) + .add(rebalancePlanForBucket); + } + } + } + + public Map> getPlanForBuckets() { + return planForBuckets; + } + + public Map> getPlanForBucketsOfPartitionedTable() { + return planForBucketsOfPartitionedTable; + } + + @Override + public String toString() { + return "RebalancePlan{" + + "planForBuckets=" + + planForBuckets + + ", planForBucketsOfPartitionedTable=" + + planForBucketsOfPartitionedTable + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + RebalancePlan that = (RebalancePlan) o; + + if (!Objects.equals(planForBuckets, that.planForBuckets)) { + return false; + } + return Objects.equals( + planForBucketsOfPartitionedTable, that.planForBucketsOfPartitionedTable); + } + + @Override + public int hashCode() { + return Objects.hash(planForBuckets, planForBucketsOfPartitionedTable); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerde.java new file mode 100644 index 0000000000..b79cd46e30 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerde.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePartition; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.utils.json.JsonDeserializer; +import org.apache.fluss.utils.json.JsonSerializer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** Json serializer and deserializer for {@link RebalancePlan}. */ +public class RebalancePlanJsonSerde + implements JsonSerializer, JsonDeserializer { + + public static final RebalancePlanJsonSerde INSTANCE = new RebalancePlanJsonSerde(); + + private static final String VERSION_KEY = "version"; + private static final String REBALANCE_PLAN = "rebalance_plan"; + + private static final String TABLE_ID = "table_id"; + private static final String PARTITION_ID = "partition_id"; + + private static final String BUCKETS = "buckets"; + private static final String BUCKET_ID = "bucket_id"; + private static final String ORIGINAL_LEADER = "original_leader"; + private static final String NEW_LEADER = "new_leader"; + private static final String ORIGIN_REPLICAS = "origin_replicas"; + private static final String NEW_REPLICAS = "new_replicas"; + + private static final int VERSION = 1; + + @Override + public void serialize(RebalancePlan rebalancePlan, JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeNumberField(VERSION_KEY, VERSION); + + generator.writeArrayFieldStart(REBALANCE_PLAN); + // first to write none-partitioned tables. + for (Map.Entry> entry : + rebalancePlan.getPlanForBuckets().entrySet()) { + generator.writeStartObject(); + generator.writeNumberField(TABLE_ID, entry.getKey()); + generator.writeArrayFieldStart(BUCKETS); + for (RebalancePlanForBucket bucketPlan : entry.getValue()) { + serializeRebalancePlanForBucket(generator, bucketPlan); + } + generator.writeEndArray(); + generator.writeEndObject(); + } + + // then to write partitioned tables. + for (Map.Entry> entry : + rebalancePlan.getPlanForBucketsOfPartitionedTable().entrySet()) { + generator.writeStartObject(); + generator.writeNumberField(TABLE_ID, entry.getKey().getTableId()); + generator.writeNumberField(PARTITION_ID, entry.getKey().getPartitionId()); + generator.writeArrayFieldStart(BUCKETS); + for (RebalancePlanForBucket bucketPlan : entry.getValue()) { + serializeRebalancePlanForBucket(generator, bucketPlan); + } + generator.writeEndArray(); + generator.writeEndObject(); + } + + generator.writeEndArray(); + + generator.writeEndObject(); + } + + @Override + public RebalancePlan deserialize(JsonNode node) { + JsonNode rebalancePlanNode = node.get(REBALANCE_PLAN); + Map planForBuckets = new HashMap<>(); + + for (JsonNode tablePartitionPlanNode : rebalancePlanNode) { + long tableId = tablePartitionPlanNode.get(TABLE_ID).asLong(); + + Long partitionId = null; + if (tablePartitionPlanNode.has(PARTITION_ID)) { + partitionId = tablePartitionPlanNode.get(PARTITION_ID).asLong(); + } + + JsonNode bucketPlanNodes = tablePartitionPlanNode.get(BUCKETS); + for (JsonNode bucketPlanNode : bucketPlanNodes) { + int bucketId = bucketPlanNode.get(BUCKET_ID).asInt(); + TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); + + int originLeader = bucketPlanNode.get(ORIGINAL_LEADER).asInt(); + + int newLeader = bucketPlanNode.get(NEW_LEADER).asInt(); + + List originReplicas = new ArrayList<>(); + Iterator elements = bucketPlanNode.get(ORIGIN_REPLICAS).elements(); + while (elements.hasNext()) { + originReplicas.add(elements.next().asInt()); + } + + List newReplicas = new ArrayList<>(); + elements = bucketPlanNode.get(NEW_REPLICAS).elements(); + while (elements.hasNext()) { + newReplicas.add(elements.next().asInt()); + } + + planForBuckets.put( + tableBucket, + new RebalancePlanForBucket( + tableBucket, originLeader, newLeader, originReplicas, newReplicas)); + } + } + + return new RebalancePlan(planForBuckets); + } + + private void serializeRebalancePlanForBucket( + JsonGenerator generator, RebalancePlanForBucket bucketPlan) throws IOException { + generator.writeStartObject(); + generator.writeNumberField(BUCKET_ID, bucketPlan.getBucketId()); + generator.writeNumberField(ORIGINAL_LEADER, bucketPlan.getOriginalLeader()); + generator.writeNumberField(NEW_LEADER, bucketPlan.getNewLeader()); + generator.writeArrayFieldStart(ORIGIN_REPLICAS); + for (Integer replica : bucketPlan.getOriginReplicas()) { + generator.writeNumber(replica); + } + generator.writeEndArray(); + generator.writeArrayFieldStart(NEW_REPLICAS); + for (Integer replica : bucketPlan.getNewReplicas()) { + generator.writeNumber(replica); + } + generator.writeEndArray(); + generator.writeEndObject(); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTags.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTags.java new file mode 100644 index 0000000000..edddcaaf75 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTags.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.ServerTag; + +import java.util.Map; +import java.util.Objects; + +/** + * The latest {@link ServerTags} of tabletServers in {@link ZkData.ServerTagsZNode}. It is used to + * store the serverTags information in zookeeper. + * + * @see ServerTagsJsonSerde for json serialization and deserialization. + */ +public class ServerTags { + + // a mapping from tabletServer id to serverTag. + private final Map serverTags; + + public ServerTags(Map serverTags) { + this.serverTags = serverTags; + } + + public Map getServerTags() { + return serverTags; + } + + @Override + public String toString() { + return "ServerTags{" + "serverTags=" + serverTags + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ServerTags that = (ServerTags) o; + return Objects.equals(serverTags, that.serverTags); + } + + @Override + public int hashCode() { + return Objects.hash(serverTags); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerde.java new file mode 100644 index 0000000000..7df94c74e4 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerde.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.utils.json.JsonDeserializer; +import org.apache.fluss.utils.json.JsonSerializer; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** Json serializer and deserializer for {@link ServerTags}. */ +public class ServerTagsJsonSerde + implements JsonSerializer, JsonDeserializer { + + public static final ServerTagsJsonSerde INSTANCE = new ServerTagsJsonSerde(); + + private static final String VERSION_KEY = "version"; + private static final String SERVER_TAGS = "server_tags"; + private static final int VERSION = 1; + + @Override + public void serialize(ServerTags serverTags, JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeNumberField(VERSION_KEY, VERSION); + generator.writeObjectFieldStart(SERVER_TAGS); + for (Map.Entry entry : serverTags.getServerTags().entrySet()) { + generator.writeNumberField(String.valueOf(entry.getKey()), entry.getValue().value); + } + generator.writeEndObject(); + + generator.writeEndObject(); + } + + @Override + public ServerTags deserialize(JsonNode node) { + JsonNode serverTagsNode = node.get(SERVER_TAGS); + Map serverTags = new HashMap<>(); + Iterator fieldNames = serverTagsNode.fieldNames(); + while (fieldNames.hasNext()) { + String serverId = fieldNames.next(); + serverTags.put( + Integer.valueOf(serverId), + ServerTag.valueOf(serverTagsNode.get(serverId).asInt())); + } + return new ServerTags(serverTags); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java index 9fa256d1cd..6d60675d85 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java @@ -554,6 +554,25 @@ public static LakeTableSnapshot decode(byte[] json) { } } + /** + * The znode for server tags. The znode path is: + * + *

/tabletServers/server_tags + */ + public static final class ServerTagsZNode { + public static String path() { + return "/tabletservers/server_tags"; + } + + public static byte[] encode(ServerTags serverTag) { + return JsonSerdeUtils.writeValueAsBytes(serverTag, ServerTagsJsonSerde.INSTANCE); + } + + public static ServerTags decode(byte[] json) { + return JsonSerdeUtils.readValue(json, ServerTagsJsonSerde.INSTANCE); + } + } + // ------------------------------------------------------------------------------------------ // ZNodes for ACL(Access Control List). // ------------------------------------------------------------------------------------------ @@ -649,4 +668,27 @@ public static Resource decode(byte[] json) { } } } + + // ------------------------------------------------------------------------------------------ + // ZNodes under "/cluster/" + // ------------------------------------------------------------------------------------------ + + /** + * The znode for rebalance. The znode path is: + * + *

/cluster/rebalance + */ + public static final class RebalanceZNode { + public static String path() { + return "/cluster/rebalance"; + } + + public static byte[] encode(RebalancePlan rebalancePlan) { + return JsonSerdeUtils.writeValueAsBytes(rebalancePlan, RebalancePlanJsonSerde.INSTANCE); + } + + public static RebalancePlan decode(byte[] json) { + return JsonSerdeUtils.readValue(json, RebalancePlanJsonSerde.INSTANCE); + } + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java index 1f7509738a..20d354775b 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java @@ -20,10 +20,14 @@ import org.apache.fluss.exception.FencedLeaderEpochException; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.messages.AddServerTagRequest; +import org.apache.fluss.rpc.messages.AddServerTagResponse; import org.apache.fluss.rpc.messages.AdjustIsrRequest; import org.apache.fluss.rpc.messages.AdjustIsrResponse; import org.apache.fluss.rpc.messages.ApiVersionsRequest; import org.apache.fluss.rpc.messages.ApiVersionsResponse; +import org.apache.fluss.rpc.messages.CancelRebalanceRequest; +import org.apache.fluss.rpc.messages.CancelRebalanceResponse; import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest; import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse; import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest; @@ -40,6 +44,8 @@ import org.apache.fluss.rpc.messages.CreateTableResponse; import org.apache.fluss.rpc.messages.DatabaseExistsRequest; import org.apache.fluss.rpc.messages.DatabaseExistsResponse; +import org.apache.fluss.rpc.messages.DescribeClusterRequest; +import org.apache.fluss.rpc.messages.DescribeClusterResponse; import org.apache.fluss.rpc.messages.DropAclsRequest; import org.apache.fluss.rpc.messages.DropAclsResponse; import org.apache.fluss.rpc.messages.DropDatabaseRequest; @@ -70,10 +76,16 @@ import org.apache.fluss.rpc.messages.ListDatabasesResponse; import org.apache.fluss.rpc.messages.ListPartitionInfosRequest; import org.apache.fluss.rpc.messages.ListPartitionInfosResponse; +import org.apache.fluss.rpc.messages.ListRebalanceProcessRequest; +import org.apache.fluss.rpc.messages.ListRebalanceProcessResponse; import org.apache.fluss.rpc.messages.ListTablesRequest; import org.apache.fluss.rpc.messages.ListTablesResponse; import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.MetadataResponse; +import org.apache.fluss.rpc.messages.RebalanceRequest; +import org.apache.fluss.rpc.messages.RebalanceResponse; +import org.apache.fluss.rpc.messages.RemoveServerTagRequest; +import org.apache.fluss.rpc.messages.RemoveServerTagResponse; import org.apache.fluss.rpc.messages.TableExistsRequest; import org.apache.fluss.rpc.messages.TableExistsResponse; import org.apache.fluss.rpc.protocol.ApiError; @@ -296,11 +308,45 @@ public CompletableFuture lakeTieringHeartbeat( throw new UnsupportedOperationException(); } + @Override + public CompletableFuture addServerTag(AddServerTagRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture removeServerTag( + RemoveServerTagRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture rebalance(RebalanceRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture listRebalanceProcess( + ListRebalanceProcessRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture cancelRebalance( + CancelRebalanceRequest request) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture listAcls(ListAclsRequest request) { throw new UnsupportedOperationException(); } + @Override + public CompletableFuture describeCluster( + DescribeClusterRequest request) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture createAcls(CreateAclsRequest request) { throw new UnsupportedOperationException(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java index 6311659237..622ef6a23b 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java @@ -26,6 +26,8 @@ import org.apache.fluss.rpc.messages.ApiVersionsResponse; import org.apache.fluss.rpc.messages.DatabaseExistsRequest; import org.apache.fluss.rpc.messages.DatabaseExistsResponse; +import org.apache.fluss.rpc.messages.DescribeClusterRequest; +import org.apache.fluss.rpc.messages.DescribeClusterResponse; import org.apache.fluss.rpc.messages.FetchLogRequest; import org.apache.fluss.rpc.messages.FetchLogResponse; import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest; @@ -318,6 +320,12 @@ public CompletableFuture listAcls(ListAclsRequest request) { throw new UnsupportedOperationException(); } + @Override + public CompletableFuture describeCluster( + DescribeClusterRequest request) { + throw new UnsupportedOperationException(); + } + public int pendingRequestSize() { return requests.size(); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java index 28d33f70d0..653d32a385 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java @@ -19,6 +19,8 @@ import org.apache.fluss.cluster.Endpoint; import org.apache.fluss.cluster.TabletServerInfo; +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.cluster.rebalance.ServerTag; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.metadata.Schema; @@ -33,6 +35,8 @@ import org.apache.fluss.server.zk.data.CoordinatorAddress; import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.server.zk.data.PartitionAssignment; +import org.apache.fluss.server.zk.data.RebalancePlan; +import org.apache.fluss.server.zk.data.ServerTags; import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.server.zk.data.TableRegistration; import org.apache.fluss.server.zk.data.TabletServerRegistration; @@ -503,6 +507,80 @@ void testPartition() throws Exception { assertThat(partitions).containsExactly("p2"); } + @Test + void testServerTag() throws Exception { + Map serverTags = new HashMap<>(); + serverTags.put(0, ServerTag.PERMANENT_OFFLINE); + serverTags.put(1, ServerTag.TEMPORARY_OFFLINE); + + zookeeperClient.registerServerTags(new ServerTags(serverTags)); + assertThat(zookeeperClient.getServerTags()).hasValue(new ServerTags(serverTags)); + + // update server tags. + serverTags.put(0, ServerTag.TEMPORARY_OFFLINE); + serverTags.remove(1); + zookeeperClient.registerServerTags(new ServerTags(serverTags)); + assertThat(zookeeperClient.getServerTags()).hasValue(new ServerTags(serverTags)); + + zookeeperClient.registerServerTags(new ServerTags(Collections.emptyMap())); + assertThat(zookeeperClient.getServerTags()) + .hasValue(new ServerTags(Collections.emptyMap())); + } + + @Test + void testRebalancePlan() throws Exception { + Map bucketPlan = new HashMap<>(); + bucketPlan.put( + new TableBucket(0L, 0), + new RebalancePlanForBucket( + new TableBucket(0L, 0), + 0, + 3, + Arrays.asList(0, 1, 2), + Arrays.asList(3, 4, 5))); + bucketPlan.put( + new TableBucket(0L, 1), + new RebalancePlanForBucket( + new TableBucket(0L, 1), + 1, + 1, + Arrays.asList(0, 1, 2), + Arrays.asList(1, 2, 3))); + bucketPlan.put( + new TableBucket(1L, 1L, 0), + new RebalancePlanForBucket( + new TableBucket(1L, 1L, 0), + 1, + 1, + Arrays.asList(0, 1, 2), + Arrays.asList(1, 2, 3))); + bucketPlan.put( + new TableBucket(1L, 1L, 1), + new RebalancePlanForBucket( + new TableBucket(1L, 1L, 1), + 1, + 1, + Arrays.asList(0, 1, 2), + Arrays.asList(1, 2, 3))); + zookeeperClient.registerRebalancePlan(new RebalancePlan(bucketPlan)); + assertThat(zookeeperClient.getRebalancePlan()).hasValue(new RebalancePlan(bucketPlan)); + + bucketPlan = new HashMap<>(); + bucketPlan.put( + new TableBucket(0L, 0), + new RebalancePlanForBucket( + new TableBucket(0L, 0), + 0, + 3, + Arrays.asList(0, 1, 2), + Arrays.asList(3, 4, 5))); + zookeeperClient.registerRebalancePlan(new RebalancePlan(bucketPlan)); + assertThat(zookeeperClient.getRebalancePlan()).hasValue(new RebalancePlan(bucketPlan)); + + zookeeperClient.deleteRebalancePlan(); + assertThat(zookeeperClient.getRebalancePlan()).isEmpty(); + } + @Test void testZookeeperConfigPath() throws Exception { final Configuration config = new Configuration(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java new file mode 100644 index 0000000000..64b6da43e5 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/RebalancePlanJsonSerdeTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.utils.json.JsonSerdeTestBase; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** Test for {@link RebalancePlanJsonSerde}. */ +public class RebalancePlanJsonSerdeTest extends JsonSerdeTestBase { + + RebalancePlanJsonSerdeTest() { + super(RebalancePlanJsonSerde.INSTANCE); + } + + @Override + protected RebalancePlan[] createObjects() { + Map bucketPlan = new HashMap<>(); + bucketPlan.put( + new TableBucket(0L, 0), + new RebalancePlanForBucket( + new TableBucket(0L, 0), + 0, + 3, + Arrays.asList(0, 1, 2), + Arrays.asList(3, 4, 5))); + bucketPlan.put( + new TableBucket(0L, 1), + new RebalancePlanForBucket( + new TableBucket(0L, 1), + 1, + 1, + Arrays.asList(0, 1, 2), + Arrays.asList(1, 2, 3))); + + bucketPlan.put( + new TableBucket(1L, 0L, 0), + new RebalancePlanForBucket( + new TableBucket(1L, 0L, 0), + 0, + 3, + Arrays.asList(0, 1, 2), + Arrays.asList(3, 4, 5))); + bucketPlan.put( + new TableBucket(1L, 0L, 1), + new RebalancePlanForBucket( + new TableBucket(1L, 0L, 1), + 1, + 1, + Arrays.asList(0, 1, 2), + Arrays.asList(1, 2, 3))); + + bucketPlan.put( + new TableBucket(1L, 1L, 0), + new RebalancePlanForBucket( + new TableBucket(1L, 1L, 0), + 0, + 3, + Arrays.asList(0, 1, 2), + Arrays.asList(3, 4, 5))); + return new RebalancePlan[] {new RebalancePlan(bucketPlan)}; + } + + @Override + protected String[] expectedJsons() { + return new String[] { + "{\"version\":1,\"rebalance_plan\":" + + "[{\"table_id\":0,\"buckets\":" + + "[{\"bucket_id\":1,\"original_leader\":1,\"new_leader\":1,\"origin_replicas\":[0,1,2],\"new_replicas\":[1,2,3]}," + + "{\"bucket_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5]}]}," + + "{\"table_id\":1,\"partition_id\":0,\"buckets\":[" + + "{\"bucket_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5]}," + + "{\"bucket_id\":1,\"original_leader\":1,\"new_leader\":1,\"origin_replicas\":[0,1,2],\"new_replicas\":[1,2,3]}]}," + + "{\"table_id\":1,\"partition_id\":1,\"buckets\":[" + + "{\"bucket_id\":0,\"original_leader\":0,\"new_leader\":3,\"origin_replicas\":[0,1,2],\"new_replicas\":[3,4,5]}]}]}" + }; + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerdeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerdeTest.java new file mode 100644 index 0000000000..8dd4e7f454 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/ServerTagsJsonSerdeTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.cluster.rebalance.ServerTag; +import org.apache.fluss.utils.json.JsonSerdeTestBase; + +import java.util.HashMap; +import java.util.Map; + +/** Test for {@link ServerTagsJsonSerde}. */ +public class ServerTagsJsonSerdeTest extends JsonSerdeTestBase { + + ServerTagsJsonSerdeTest() { + super(ServerTagsJsonSerde.INSTANCE); + } + + @Override + protected ServerTags[] createObjects() { + Map serverTags = new HashMap<>(); + serverTags.put(0, ServerTag.PERMANENT_OFFLINE); + serverTags.put(1, ServerTag.TEMPORARY_OFFLINE); + + Map serverTags2 = new HashMap<>(); + + return new ServerTags[] {new ServerTags(serverTags), new ServerTags(serverTags2)}; + } + + protected String[] expectedJsons() { + return new String[] { + "{\"version\":1,\"server_tags\":{\"0\":0,\"1\":1}}", + "{\"version\":1,\"server_tags\":{}}" + }; + } +}