Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.rebalance.GoalType;
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.exception.AuthorizationException;
import org.apache.fluss.exception.DatabaseAlreadyExistException;
import org.apache.fluss.exception.DatabaseNotEmptyException;
import org.apache.fluss.exception.DatabaseNotExistException;
Expand All @@ -32,10 +37,15 @@
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.KvSnapshotNotExistException;
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
import org.apache.fluss.exception.NoRebalanceInProgressException;
import org.apache.fluss.exception.NonPrimaryKeyTableException;
import org.apache.fluss.exception.PartitionAlreadyExistsException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.RebalanceFailureException;
import org.apache.fluss.exception.SchemaNotExistException;
import org.apache.fluss.exception.ServerNotExistException;
import org.apache.fluss.exception.ServerTagAlreadyExistException;
import org.apache.fluss.exception.ServerTagNotExistException;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.exception.TableNotPartitionedException;
Expand All @@ -56,6 +66,7 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -452,4 +463,90 @@ ListOffsetsResult listOffsets(
* @return A CompletableFuture indicating completion of the operation.
*/
DropAclsResult dropAcls(Collection<AclBindingFilter> filters);

/**
* Add server tag to the specified tabletServers, one tabletServer can only have one serverTag.
*
* <p>If one tabletServer failed adding tag, none of the tags will take effect.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
* access to the cluster.
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
* exist.
* <li>{@link ServerTagAlreadyExistException} If the server tag already exists when {@code
* overWriteIfExists} is false.
* </ul>
*
* @param tabletServers the tabletServers we want to add server tags.
* @param serverTag the server tag to be added.
*/
CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag);

/**
* Remove server tag from the specified tabletServers.
*
* <p>If one tabletServer failed removing tag, none of the tags will be removed.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
* access to the cluster.
* <li>{@link ServerNotExistException} If the tabletServer in {@code tabletServers} does not
* exist.
* <li>{@link ServerTagNotExistException} If the server tag does not exist when {@code
* overWriteIfExists} is false.
* </ul>
*
* @param tabletServers the tabletServers we want to remove server tags.
*/
CompletableFuture<Void> removeServerTag(List<Integer> tabletServers, ServerTag serverTag);

/**
* Based on the provided {@code priorityGoals}, Fluss performs load balancing on the cluster's
* bucket load.
*
* <p>More details, Fluss collects the cluster's load information and optimizes to perform load
* balancing according to the user-defined {@code priorityGoals}.
*
* <p>Currently, Fluss only supports one active rebalance task in the cluster. If an uncompleted
* rebalance task exists, an {@link RebalanceFailureException} will be thrown.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
* access to the cluster.
* <li>{@link RebalanceFailureException} If the rebalance failed. Such as there is an ongoing
* execution.
* </ul>
*
* @param priorityGoals the goals to be optimized.
* @param dryRun Calculate and return the rebalance optimization proposal, but do not execute
* it.
* @return the generated rebalance plan for all the tableBuckets which need to do rebalance.
*/
CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
List<GoalType> priorityGoals, boolean dryRun);

/**
* List the rebalance process.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
* access to the cluster.
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
* </ul>
*
* @return the rebalance process for all the tableBuckets doing rebalance.
*/
CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess();

/**
* Cannel the rebalance task.
*
* <ul>
* <li>{@link AuthorizationException} If the authenticated user doesn't have reset config
* access to the cluster.
* <li>{@link NoRebalanceInProgressException} If there are no rebalance tasks in progress.
* </ul>
*/
CompletableFuture<Void> cancelRebalance();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.client.utils.ClientRpcMessageUtils;
import org.apache.fluss.cluster.Cluster;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.rebalance.GoalType;
import org.apache.fluss.cluster.rebalance.RebalancePlanForBucket;
import org.apache.fluss.cluster.rebalance.RebalanceResultForBucket;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.exception.LeaderNotAvailableException;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
Expand All @@ -41,6 +44,7 @@
import org.apache.fluss.rpc.gateway.AdminGateway;
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.AddServerTagRequest;
import org.apache.fluss.rpc.messages.CreateAclsRequest;
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
import org.apache.fluss.rpc.messages.CreateTableRequest;
Expand All @@ -65,6 +69,7 @@
import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
import org.apache.fluss.rpc.messages.PbPartitionSpec;
import org.apache.fluss.rpc.messages.PbTablePath;
import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
import org.apache.fluss.rpc.messages.TableExistsRequest;
import org.apache.fluss.rpc.messages.TableExistsResponse;
import org.apache.fluss.rpc.protocol.ApiError;
Expand All @@ -86,7 +91,7 @@
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makePbPartitionSpec;
import static org.apache.fluss.client.utils.MetadataUtils.sendMetadataRequestAndRebuildCluster;
import static org.apache.fluss.client.utils.MetadataUtils.sendDescribeClusterRequest;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toPbAclBindingFilters;
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toPbAclFilter;
Expand Down Expand Up @@ -120,17 +125,8 @@ public CompletableFuture<List<ServerNode>> getServerNodes() {
CompletableFuture.runAsync(
() -> {
try {
List<ServerNode> serverNodeList = new ArrayList<>();
Cluster cluster =
sendMetadataRequestAndRebuildCluster(
readOnlyGateway,
false,
metadataUpdater.getCluster(),
null,
null,
null);
serverNodeList.add(cluster.getCoordinatorServer());
serverNodeList.addAll(cluster.getAliveTabletServerList());
List<ServerNode> serverNodeList =
sendDescribeClusterRequest(readOnlyGateway);
future.complete(serverNodeList);
} catch (Throwable t) {
future.completeExceptionally(t);
Expand Down Expand Up @@ -464,6 +460,37 @@ public DropAclsResult dropAcls(Collection<AclBindingFilter> filters) {
return result;
}

@Override
public CompletableFuture<Void> addServerTag(List<Integer> tabletServers, ServerTag serverTag) {
AddServerTagRequest request = new AddServerTagRequest().setServerTag(serverTag.value);
tabletServers.forEach(request::addServerId);
return gateway.addServerTag(request).thenApply(r -> null);
}

@Override
public CompletableFuture<Void> removeServerTag(
List<Integer> tabletServers, ServerTag serverTag) {
RemoveServerTagRequest request = new RemoveServerTagRequest().setServerTag(serverTag.value);
tabletServers.forEach(request::addServerId);
return gateway.removeServerTag(request).thenApply(r -> null);
}

@Override
public CompletableFuture<Map<TableBucket, RebalancePlanForBucket>> rebalance(
List<GoalType> priorityGoals, boolean dryRun) {
throw new UnsupportedOperationException("Support soon");
}

@Override
public CompletableFuture<Map<TableBucket, RebalanceResultForBucket>> listRebalanceProcess() {
throw new UnsupportedOperationException("Support soon");
}

@Override
public CompletableFuture<Void> cancelRebalance() {
throw new UnsupportedOperationException("Support soon");
}

@Override
public void close() {
// nothing to do yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.fluss.cluster.Cluster;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.cluster.rebalance.ServerTag;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.StaleMetadataException;
import org.apache.fluss.metadata.PhysicalTablePath;
Expand All @@ -31,6 +32,8 @@
import org.apache.fluss.rpc.GatewayClientProxy;
import org.apache.fluss.rpc.RpcClient;
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
import org.apache.fluss.rpc.messages.DescribeClusterRequest;
import org.apache.fluss.rpc.messages.DescribeClusterResponse;
import org.apache.fluss.rpc.messages.MetadataRequest;
import org.apache.fluss.rpc.messages.MetadataResponse;
import org.apache.fluss.rpc.messages.PbBucketMetadata;
Expand Down Expand Up @@ -165,6 +168,22 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
// time out here
}

public static List<ServerNode> sendDescribeClusterRequest(AdminReadOnlyGateway gateway)
throws ExecutionException, InterruptedException, TimeoutException {
DescribeClusterRequest describeClusterRequest = new DescribeClusterRequest();
return gateway.describeCluster(describeClusterRequest)
.thenApply(
response -> {
List<ServerNode> serverNodes = new ArrayList<>();
serverNodes.add(getCoordinatorServer(response));
serverNodes.addAll(getAliveTabletServers(response));
return serverNodes;
})
.get(30, TimeUnit.SECONDS); // TODO currently, we don't have timeout logic in
// RpcClient, it will let the get() block forever. So we
// time out here
}

private static NewTableMetadata getTableMetadataToUpdate(
Cluster cluster, MetadataResponse metadataResponse) {
Map<TablePath, Long> newTablePathToTableId = new HashMap<>();
Expand Down Expand Up @@ -277,6 +296,19 @@ private static ServerNode getCoordinatorServer(MetadataResponse response) {
}
}

private static ServerNode getCoordinatorServer(DescribeClusterResponse response) {
if (!response.hasCoordinatorServer()) {
return null;
} else {
PbServerNode protoServerNode = response.getCoordinatorServer();
return new ServerNode(
protoServerNode.getNodeId(),
protoServerNode.getHost(),
protoServerNode.getPort(),
ServerType.COORDINATOR);
}
}

private static Map<Integer, ServerNode> getAliveTabletServers(MetadataResponse response) {
Map<Integer, ServerNode> aliveTabletServers = new HashMap<>();
response.getTabletServersList()
Expand All @@ -295,6 +327,25 @@ private static Map<Integer, ServerNode> getAliveTabletServers(MetadataResponse r
return aliveTabletServers;
}

public static List<ServerNode> getAliveTabletServers(DescribeClusterResponse response) {
List<ServerNode> aliveTabletServers = new ArrayList<>();
response.getTabletServersList()
.forEach(
serverNode -> {
aliveTabletServers.add(
new ServerNode(
serverNode.getNodeId(),
serverNode.getHost(),
serverNode.getPort(),
ServerType.TABLET_SERVER,
serverNode.hasRack() ? serverNode.getRack() : null,
serverNode.hasServerTag()
? ServerTag.valueOf(serverNode.getServerTag())
: null));
});
return aliveTabletServers;
}

private static List<BucketLocation> toBucketLocations(
TablePath tablePath,
long tableId,
Expand Down
Loading