diff --git a/fluss-common/src/main/java/com/alibaba/fluss/cluster/MetadataCache.java b/fluss-common/src/main/java/com/alibaba/fluss/cluster/MetadataCache.java
deleted file mode 100644
index f18c1487de..0000000000
--- a/fluss-common/src/main/java/com/alibaba/fluss/cluster/MetadataCache.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright (c) 2025 Alibaba Group Holding Ltd.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.cluster;
-
-import com.alibaba.fluss.annotation.Internal;
-import com.alibaba.fluss.metadata.PhysicalTablePath;
-
-import javax.annotation.Nullable;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-/**
- * The metadata cache to cache the cluster metadata info.
- *
- *
Note : For this interface we only support cache cluster server node info.
- */
-@Internal
-public interface MetadataCache {
-
- /**
- * Get the coordinator server node.
- *
- * @return the coordinator server node
- */
- Optional getCoordinatorServer(String listenerName);
-
- /**
- * Check whether the tablet server id related tablet server node is alive.
- *
- * @param serverId the tablet server id
- * @return true if the server is alive, false otherwise
- */
- boolean isAliveTabletServer(int serverId);
-
- /**
- * Get the tablet server.
- *
- * @param serverId the tablet server id
- * @return the tablet server node
- */
- Optional getTabletServer(int serverId, String listenerName);
-
- /**
- * Get all alive tablet server nodes.
- *
- * @return all alive tablet server nodes
- */
- Map getAllAliveTabletServers(String listenerName);
-
- Set getAliveTabletServerInfos();
-
- @Nullable
- PhysicalTablePath getTablePath(long tableId);
-
- /** Get ids of all alive tablet server nodes. */
- default TabletServerInfo[] getLiveServers() {
- Set aliveTabletServerInfos = getAliveTabletServerInfos();
- TabletServerInfo[] server = new TabletServerInfo[aliveTabletServerInfos.size()];
- Iterator iterator = aliveTabletServerInfos.iterator();
- for (int i = 0; i < aliveTabletServerInfos.size(); i++) {
- server[i] = iterator.next();
- }
- return server;
- }
-}
diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminReadOnlyGateway.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminReadOnlyGateway.java
index 2a078d3177..ad22f1560e 100644
--- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminReadOnlyGateway.java
+++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminReadOnlyGateway.java
@@ -45,8 +45,6 @@
import com.alibaba.fluss.rpc.messages.MetadataResponse;
import com.alibaba.fluss.rpc.messages.TableExistsRequest;
import com.alibaba.fluss.rpc.messages.TableExistsResponse;
-import com.alibaba.fluss.rpc.messages.UpdateMetadataRequest;
-import com.alibaba.fluss.rpc.messages.UpdateMetadataResponse;
import com.alibaba.fluss.rpc.protocol.ApiKeys;
import com.alibaba.fluss.rpc.protocol.RPC;
@@ -129,15 +127,6 @@ public interface AdminReadOnlyGateway extends RpcGateway {
@RPC(api = ApiKeys.GET_METADATA)
CompletableFuture metadata(MetadataRequest request);
- /**
- * request send to tablet server to update the metadata cache for every tablet server node,
- * asynchronously.
- *
- * @return the update metadata response
- */
- @RPC(api = ApiKeys.UPDATE_METADATA)
- CompletableFuture updateMetadata(UpdateMetadataRequest request);
-
/**
* Get the latest kv snapshots of a primary key table. A kv snapshot is a snapshot of a kv
* tablet, so a table can have multiple kv snapshots.
diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java
index ba99eb8936..45139e85ef 100644
--- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java
+++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java
@@ -43,6 +43,8 @@
import com.alibaba.fluss.rpc.messages.PutKvResponse;
import com.alibaba.fluss.rpc.messages.StopReplicaRequest;
import com.alibaba.fluss.rpc.messages.StopReplicaResponse;
+import com.alibaba.fluss.rpc.messages.UpdateMetadataRequest;
+import com.alibaba.fluss.rpc.messages.UpdateMetadataResponse;
import com.alibaba.fluss.rpc.protocol.ApiKeys;
import com.alibaba.fluss.rpc.protocol.RPC;
@@ -60,6 +62,15 @@ public interface TabletServerGateway extends RpcGateway, AdminReadOnlyGateway {
CompletableFuture notifyLeaderAndIsr(
NotifyLeaderAndIsrRequest notifyLeaderAndIsrRequest);
+ /**
+ * request send to tablet server to update the metadata cache for every tablet server node,
+ * asynchronously.
+ *
+ * @return the update metadata response
+ */
+ @RPC(api = ApiKeys.UPDATE_METADATA)
+ CompletableFuture updateMetadata(UpdateMetadataRequest request);
+
/**
* Stop replica.
*
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto
index a1cb2a9ba2..93a5b8b155 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -170,6 +170,8 @@ message UpdateMetadataRequest {
optional PbServerNode coordinator_server = 1;
repeated PbServerNode tablet_servers = 2;
repeated PbTableMetadata table_metadata = 3;
+ repeated PbPartitionMetadata partition_metadata = 4;
+ optional int32 coordinator_epoch = 5;
}
message UpdateMetadataResponse {
@@ -565,6 +567,9 @@ message PbTableMetadata {
repeated PbBucketMetadata bucket_metadata = 5;
required int64 created_time = 6;
required int64 modified_time = 7;
+
+ // TODO add a new filed 'deleted_table' to indicate this table is deleted in UpdateMetadataRequest.
+ // trace by: https://github.com/alibaba/fluss/issues/981
}
message PbPartitionMetadata {
@@ -581,6 +586,7 @@ message PbBucketMetadata {
optional int32 leader_id = 2;
repeated int32 replica_id = 3 [packed = true];
// TODO: Add isr here.
+ optional int32 leader_epoch = 4;
}
message PbProduceLogReqForBucket {
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/com/alibaba/fluss/server/RpcServiceBase.java
index aa69d9ac68..4b68dcb79e 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/RpcServiceBase.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/RpcServiceBase.java
@@ -16,7 +16,6 @@
package com.alibaba.fluss.server;
-import com.alibaba.fluss.cluster.BucketLocation;
import com.alibaba.fluss.cluster.ServerNode;
import com.alibaba.fluss.cluster.ServerType;
import com.alibaba.fluss.exception.FlussRuntimeException;
@@ -71,8 +70,6 @@
import com.alibaba.fluss.rpc.messages.PbTablePath;
import com.alibaba.fluss.rpc.messages.TableExistsRequest;
import com.alibaba.fluss.rpc.messages.TableExistsResponse;
-import com.alibaba.fluss.rpc.messages.UpdateMetadataRequest;
-import com.alibaba.fluss.rpc.messages.UpdateMetadataResponse;
import com.alibaba.fluss.rpc.protocol.ApiKeys;
import com.alibaba.fluss.rpc.protocol.ApiManager;
import com.alibaba.fluss.security.acl.AclBinding;
@@ -83,10 +80,10 @@
import com.alibaba.fluss.server.coordinator.CoordinatorService;
import com.alibaba.fluss.server.coordinator.MetadataManager;
import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshot;
-import com.alibaba.fluss.server.metadata.ClusterMetadataInfo;
-import com.alibaba.fluss.server.metadata.PartitionMetadataInfo;
+import com.alibaba.fluss.server.metadata.BucketMetadata;
+import com.alibaba.fluss.server.metadata.PartitionMetadata;
import com.alibaba.fluss.server.metadata.ServerMetadataCache;
-import com.alibaba.fluss.server.metadata.TableMetadataInfo;
+import com.alibaba.fluss.server.metadata.TableMetadata;
import com.alibaba.fluss.server.tablet.TabletService;
import com.alibaba.fluss.server.zk.ZooKeeperClient;
import com.alibaba.fluss.server.zk.data.BucketAssignment;
@@ -113,6 +110,7 @@
import static com.alibaba.fluss.rpc.util.CommonRpcMessageUtils.toAclFilter;
import static com.alibaba.fluss.security.acl.Resource.TABLE_SPLITTER;
+import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.buildMetadataResponse;
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeGetLatestKvSnapshotsResponse;
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeGetLatestLakeSnapshotResponse;
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeKvSnapshotMetadataResponse;
@@ -304,8 +302,8 @@ public CompletableFuture metadata(MetadataRequest request) {
List partitions = request.getPartitionsPathsList();
long[] partitionIds = request.getPartitionsIds();
- List tableMetadataInfos = new ArrayList<>();
- List partitionMetadataInfos = new ArrayList<>();
+ List tableMetadata = new ArrayList<>();
+ List partitionMetadata = new ArrayList<>();
for (PbTablePath pbTablePath : pbTablePaths) {
if (authorizer == null
@@ -316,7 +314,7 @@ public CompletableFuture metadata(MetadataRequest request) {
pbTablePath.getDatabaseName(), pbTablePath.getTableName()))) {
TablePath tablePath = toTablePath(pbTablePath);
tablePaths.add(tablePath);
- tableMetadataInfos.add(getTableMetadata(tablePath, listenerName));
+ tableMetadata.add(getTableMetadata(tablePath));
}
}
@@ -328,27 +326,19 @@ public CompletableFuture metadata(MetadataRequest request) {
Resource.table(
partitionPath.getDatabaseName(),
partitionPath.getTableName()))) {
- partitionMetadataInfos.add(
- getPartitionMetadata(toPhysicalTablePath(partitionPath), listenerName));
+ partitionMetadata.add(getPartitionMetadata(toPhysicalTablePath(partitionPath)));
}
}
- // get partition info from partition ids
- partitionMetadataInfos.addAll(getPartitionMetadata(tablePaths, partitionIds, listenerName));
+ // get partition metadata from partition ids
+ partitionMetadata.addAll(getPartitionMetadata(tablePaths, partitionIds));
return CompletableFuture.completedFuture(
- ClusterMetadataInfo.toMetadataResponse(
+ buildMetadataResponse(
metadataCache.getCoordinatorServer(listenerName),
aliveTableServers,
- tableMetadataInfos,
- partitionMetadataInfos));
- }
-
- @Override
- public CompletableFuture updateMetadata(UpdateMetadataRequest request) {
- UpdateMetadataResponse updateMetadataResponse = new UpdateMetadataResponse();
- metadataCache.updateClusterMetadata(ClusterMetadataInfo.fromUpdateMetadataRequest(request));
- return CompletableFuture.completedFuture(updateMetadataResponse);
+ tableMetadata,
+ partitionMetadata));
}
@Override
@@ -515,7 +505,7 @@ public CompletableFuture listAcls(ListAclsRequest request) {
return CompletableFuture.completedFuture(makeListAclsResponse(acls));
} catch (Exception e) {
throw new FlussRuntimeException(
- String.format("Failed to list acls for resource: ", aclBindingFilter), e);
+ String.format("Failed to list acls for resource: %s", aclBindingFilter), e);
}
}
@@ -524,65 +514,57 @@ private Set getAllTabletServerNodes(String listenerName) {
}
/**
- * Returned a {@link TableMetadataInfo} contains the table info for {@code tablePath} and the
- * bucket locations for {@code physicalTablePaths}.
+ * Returned a {@link TableMetadata} contains the table info for {@code tablePath} and the bucket
+ * locations for {@code physicalTablePaths}.
*/
- private TableMetadataInfo getTableMetadata(TablePath tablePath, String listenerName) {
+ private TableMetadata getTableMetadata(TablePath tablePath) {
TableInfo tableInfo = metadataManager.getTable(tablePath);
long tableId = tableInfo.getTableId();
try {
AssignmentInfo assignmentInfo =
getAssignmentInfo(tableId, PhysicalTablePath.of(tablePath));
- List bucketLocations = new ArrayList<>();
+ List bucketMetadataList = new ArrayList<>();
if (assignmentInfo.tableAssignment != null) {
TableAssignment tableAssignment = assignmentInfo.tableAssignment;
- bucketLocations =
- toBucketLocations(
- PhysicalTablePath.of(tablePath),
- tableId,
- null,
- tableAssignment,
- listenerName);
+ bucketMetadataList = toBucketMetadataList(tableId, null, tableAssignment, null);
} else {
if (!tableInfo.isPartitioned()) {
LOG.warn("No table assignment node found for table {}", tableId);
}
}
- return new TableMetadataInfo(tableInfo, bucketLocations);
+ return new TableMetadata(tableInfo, bucketMetadataList);
} catch (Exception e) {
throw new FlussRuntimeException(
String.format("Failed to get metadata for %s", tablePath), e);
}
}
- private PartitionMetadataInfo getPartitionMetadata(
- PhysicalTablePath partitionPath, String listenerName) {
+ private PartitionMetadata getPartitionMetadata(PhysicalTablePath partitionPath) {
try {
checkNotNull(
partitionPath.getPartitionName(),
"partitionName must be not null, but get: " + partitionPath);
AssignmentInfo assignmentInfo = getAssignmentInfo(null, partitionPath);
- List bucketLocations = new ArrayList<>();
+ List bucketMetadataList = new ArrayList<>();
checkNotNull(
assignmentInfo.partitionId,
"partition id must be not null for " + partitionPath);
if (assignmentInfo.tableAssignment != null) {
TableAssignment tableAssignment = assignmentInfo.tableAssignment;
- bucketLocations =
- toBucketLocations(
- partitionPath,
+ bucketMetadataList =
+ toBucketMetadataList(
assignmentInfo.tableId,
assignmentInfo.partitionId,
tableAssignment,
- listenerName);
+ null);
} else {
LOG.warn("No partition assignment node found for partition {}", partitionPath);
}
- return new PartitionMetadataInfo(
+ return new PartitionMetadata(
assignmentInfo.tableId,
partitionPath.getPartitionName(),
assignmentInfo.partitionId,
- bucketLocations);
+ bucketMetadataList);
} catch (PartitionNotExistException e) {
throw e;
} catch (Exception e) {
@@ -591,15 +573,15 @@ private PartitionMetadataInfo getPartitionMetadata(
}
}
- private List getPartitionMetadata(
- Collection tablePaths, long[] partitionIds, String listenerName) {
+ private List getPartitionMetadata(
+ Collection tablePaths, long[] partitionIds) {
// todo: hack logic; currently, we can't get partition metadata by partition ids directly,
// in here, we always assume the partition ids must belong to the first argument tablePaths;
// at least, in current client metadata request design, the assumption is true.
// but the assumption is fragile; we should use metadata cache to help to get partition by
// partition ids
- List partitionMetadataInfos = new ArrayList<>();
+ List partitionMetadata = new ArrayList<>();
Set partitionIdSet = new HashSet<>();
for (long partitionId : partitionIds) {
partitionIdSet.add(partitionId);
@@ -616,10 +598,9 @@ private List getPartitionMetadata(
// the partition is under the table, get the metadata
String partitionName = partitionNameById.get(partitionId);
if (partitionName != null) {
- partitionMetadataInfos.add(
+ partitionMetadata.add(
getPartitionMetadata(
- PhysicalTablePath.of(tablePath, partitionName),
- listenerName));
+ PhysicalTablePath.of(tablePath, partitionName)));
hitPartitionIds.add(partitionId);
}
}
@@ -634,17 +615,16 @@ private List getPartitionMetadata(
throw new PartitionNotExistException(
"Partition not exist for partition ids: " + partitionIdSet);
}
- return partitionMetadataInfos;
+ return partitionMetadata;
}
- private List toBucketLocations(
- PhysicalTablePath physicalTablePath,
+ private List toBucketMetadataList(
long tableId,
@Nullable Long partitionId,
TableAssignment tableAssignment,
- String listenerName)
+ @Nullable Integer leaderEpoch)
throws Exception {
- List bucketLocations = new ArrayList<>();
+ List bucketMetadataList = new ArrayList<>();
// iterate each bucket assignment
for (Map.Entry assignment :
tableAssignment.getBucketAssignments().entrySet()) {
@@ -652,35 +632,13 @@ private List toBucketLocations(
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
List replicas = assignment.getValue().getReplicas();
- ServerNode[] replicaNode = new ServerNode[replicas.size()];
- Map nodes = metadataCache.getAllAliveTabletServers(listenerName);
- for (int i = 0; i < replicas.size(); i++) {
- replicaNode[i] =
- nodes.getOrDefault(
- replicas.get(i),
- // if not in alive node, we set host as ""
- // and port as -1 just like kafka
- // TODO: client will not use this node to connect,
- // should be removed in the future.
- new ServerNode(
- replicas.get(i), "", -1, ServerType.TABLET_SERVER, null));
- }
// now get the leader
Optional optLeaderAndIsr = zkClient.getLeaderAndIsr(tableBucket);
- ServerNode leader;
- leader =
- optLeaderAndIsr
- .map(
- leaderAndIsr ->
- metadataCache
- .getAllAliveTabletServers(listenerName)
- .get(leaderAndIsr.leader()))
- .orElse(null);
- bucketLocations.add(
- new BucketLocation(physicalTablePath, tableBucket, leader, replicaNode));
+ Integer leader = optLeaderAndIsr.map(LeaderAndIsr::leader).orElse(null);
+ bucketMetadataList.add(new BucketMetadata(bucketId, leader, leaderEpoch, replicas));
}
- return bucketLocations;
+ return bucketMetadataList;
}
private AssignmentInfo getAssignmentInfo(
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java
index e8ddd61693..18a087457e 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/AutoPartitionManager.java
@@ -17,7 +17,6 @@
package com.alibaba.fluss.server.coordinator;
import com.alibaba.fluss.annotation.VisibleForTesting;
-import com.alibaba.fluss.cluster.MetadataCache;
import com.alibaba.fluss.cluster.TabletServerInfo;
import com.alibaba.fluss.config.AutoPartitionTimeUnit;
import com.alibaba.fluss.config.ConfigOptions;
@@ -29,6 +28,7 @@
import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.server.metadata.ServerMetadataCache;
import com.alibaba.fluss.server.zk.data.BucketAssignment;
import com.alibaba.fluss.server.zk.data.PartitionAssignment;
import com.alibaba.fluss.server.zk.data.TableRegistration;
@@ -80,7 +80,7 @@ public class AutoPartitionManager implements AutoCloseable {
/** scheduled executor, periodically trigger auto partition. */
private final ScheduledExecutorService periodicExecutor;
- private final MetadataCache metadataCache;
+ private final ServerMetadataCache metadataCache;
private final MetadataManager metadataManager;
private final Clock clock;
@@ -101,7 +101,9 @@ public class AutoPartitionManager implements AutoCloseable {
private final Lock lock = new ReentrantLock();
public AutoPartitionManager(
- MetadataCache metadataCache, MetadataManager metadataManager, Configuration conf) {
+ ServerMetadataCache metadataCache,
+ MetadataManager metadataManager,
+ Configuration conf) {
this(
metadataCache,
metadataManager,
@@ -113,7 +115,7 @@ public AutoPartitionManager(
@VisibleForTesting
AutoPartitionManager(
- MetadataCache metadataCache,
+ ServerMetadataCache metadataCache,
MetadataManager metadataManager,
Configuration conf,
Clock clock,
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorContext.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorContext.java
index b7164db511..52a5e49cb7 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorContext.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorContext.java
@@ -98,6 +98,7 @@ public class CoordinatorContext {
*/
private final Map> replicasOnOffline = new HashMap<>();
+ private ServerInfo coordinatorServerInfo = null;
private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
public CoordinatorContext() {}
@@ -116,6 +117,14 @@ public void setLiveTabletServers(List servers) {
servers.forEach(server -> liveTabletServers.put(server.id(), server));
}
+ public ServerInfo getCoordinatorServerInfo() {
+ return coordinatorServerInfo;
+ }
+
+ public void setCoordinatorServerInfo(ServerInfo coordinatorServerInfo) {
+ this.coordinatorServerInfo = coordinatorServerInfo;
+ }
+
public void addLiveTabletServer(ServerInfo serverInfo) {
this.liveTabletServers.put(serverInfo.id(), serverInfo);
}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
index 235b0dcf66..49990ebce4 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -68,9 +68,8 @@
import com.alibaba.fluss.server.entity.NotifyLeaderAndIsrResultForBucket;
import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshot;
import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshotStore;
-import com.alibaba.fluss.server.metadata.ClusterMetadataInfo;
+import com.alibaba.fluss.server.metadata.CoordinatorMetadataCache;
import com.alibaba.fluss.server.metadata.ServerInfo;
-import com.alibaba.fluss.server.metadata.ServerMetadataCache;
import com.alibaba.fluss.server.metrics.group.CoordinatorMetricGroup;
import com.alibaba.fluss.server.zk.ZooKeeperClient;
import com.alibaba.fluss.server.zk.data.BucketAssignment;
@@ -128,18 +127,13 @@ public class CoordinatorEventProcessor implements EventProcessor {
private final TableChangeWatcher tableChangeWatcher;
private final CoordinatorChannelManager coordinatorChannelManager;
private final TabletServerChangeWatcher tabletServerChangeWatcher;
- private final ServerMetadataCache serverMetadataCache;
+ private final CoordinatorMetadataCache serverMetadataCache;
private final CoordinatorRequestBatch coordinatorRequestBatch;
private final CoordinatorMetricGroup coordinatorMetricGroup;
private final String internalListenerName;
private final CompletedSnapshotStoreManager completedSnapshotStoreManager;
- // in normal case, it won't be null, but from I can see, it'll only be null in unit test
- // since the we won't register a coordinator node in zk.
- // todo: may remove the nullable in the future
- private @Nullable ServerInfo coordinatorServerInfo;
-
// metrics
private volatile int tabletServerCount;
private volatile int offlineBucketCount;
@@ -149,28 +143,7 @@ public class CoordinatorEventProcessor implements EventProcessor {
public CoordinatorEventProcessor(
ZooKeeperClient zooKeeperClient,
- ServerMetadataCache serverMetadataCache,
- CoordinatorChannelManager coordinatorChannelManager,
- AutoPartitionManager autoPartitionManager,
- LakeTableTieringManager lakeTableTieringManager,
- CoordinatorMetricGroup coordinatorMetricGroup,
- Configuration conf,
- ExecutorService ioExecutor) {
- this(
- zooKeeperClient,
- serverMetadataCache,
- coordinatorChannelManager,
- new CoordinatorContext(),
- autoPartitionManager,
- lakeTableTieringManager,
- coordinatorMetricGroup,
- conf,
- ioExecutor);
- }
-
- public CoordinatorEventProcessor(
- ZooKeeperClient zooKeeperClient,
- ServerMetadataCache serverMetadataCache,
+ CoordinatorMetadataCache serverMetadataCache,
CoordinatorChannelManager coordinatorChannelManager,
CoordinatorContext coordinatorContext,
AutoPartitionManager autoPartitionManager,
@@ -187,13 +160,17 @@ public CoordinatorEventProcessor(
new ReplicaStateMachine(
coordinatorContext,
new CoordinatorRequestBatch(
- coordinatorChannelManager, coordinatorEventManager),
+ coordinatorChannelManager,
+ coordinatorEventManager,
+ coordinatorContext),
zooKeeperClient);
this.tableBucketStateMachine =
new TableBucketStateMachine(
coordinatorContext,
new CoordinatorRequestBatch(
- coordinatorChannelManager, coordinatorEventManager),
+ coordinatorChannelManager,
+ coordinatorEventManager,
+ coordinatorContext),
zooKeeperClient);
this.metadataManager = new MetadataManager(zooKeeperClient, conf);
@@ -208,7 +185,8 @@ public CoordinatorEventProcessor(
this.tabletServerChangeWatcher =
new TabletServerChangeWatcher(zooKeeperClient, coordinatorEventManager);
this.coordinatorRequestBatch =
- new CoordinatorRequestBatch(coordinatorChannelManager, coordinatorEventManager);
+ new CoordinatorRequestBatch(
+ coordinatorChannelManager, coordinatorEventManager, coordinatorContext);
this.completedSnapshotStoreManager =
new CompletedSnapshotStoreManager(
conf.getInt(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS),
@@ -237,7 +215,7 @@ public CoordinatorEventManager getCoordinatorEventManager() {
}
public void startup() {
- coordinatorServerInfo = getCoordinatorServerInfo();
+ coordinatorContext.setCoordinatorServerInfo(getCoordinatorServerInfo());
// start watchers first so that we won't miss node in zk;
tabletServerChangeWatcher.start();
tableChangeWatcher.start();
@@ -249,16 +227,16 @@ public void startup() {
}
// We need to send UpdateMetadataRequest after the coordinator context is initialized and
- // before the state machines in tableManager
- // are started. This is because tablet servers need to receive the list of live tablet
- // servers from UpdateMetadataRequest before
- // they can process the LeaderRequests that are generated by
- // replicaStateMachine.startup() and
+ // before the state machines in tableManager are started. This is because tablet servers
+ // need to receive the list of live tablet servers from UpdateMetadataRequest before they
+ // can process the LeaderRequests that are generated by replicaStateMachine.startup() and
// partitionStateMachine.startup().
- LOG.info("Sending update metadata request.");
- updateServerMetadataCache(
- Optional.ofNullable(coordinatorServerInfo),
- new HashSet<>(coordinatorContext.getLiveTabletServers().values()));
+ // update coordinator metadata cache when CoordinatorServer start.
+ HashSet tabletServerInfoList =
+ new HashSet<>(coordinatorContext.getLiveTabletServers().values());
+ serverMetadataCache.updateMetadata(
+ coordinatorContext.getCoordinatorServerInfo(), tabletServerInfoList);
+ updateTabletServerMetadataCacheWhenStartup(tabletServerInfoList);
// start table manager
tableManager.startup();
@@ -291,7 +269,8 @@ private ServerInfo getCoordinatorServerInfo() {
.orElseGet(
() -> {
LOG.error("Coordinator server address is empty in zookeeper.");
- return null;
+ throw new FlussRuntimeException(
+ "Coordinator server address is empty in zookeeper.");
});
} catch (Exception e) {
throw new FlussRuntimeException("Get coordinator address failed.", e);
@@ -457,9 +436,6 @@ private void onShutdown() {
// first shutdown table manager
tableManager.shutdown();
- // then reset coordinatorContext
- coordinatorContext.resetContext();
-
// then stop watchers
tableChangeWatcher.stop();
tabletServerChangeWatcher.stop();
@@ -563,16 +539,34 @@ private void processCreateTable(CreateTableEvent createTableEvent) {
}
TableInfo tableInfo = createTableEvent.getTableInfo();
coordinatorContext.putTableInfo(tableInfo);
+ TableAssignment tableAssignment = createTableEvent.getTableAssignment();
tableManager.onCreateNewTable(
- tableInfo.getTablePath(),
- tableInfo.getTableId(),
- createTableEvent.getTableAssignment());
+ tableInfo.getTablePath(), tableInfo.getTableId(), tableAssignment);
if (createTableEvent.isAutoPartitionTable()) {
autoPartitionManager.addAutoPartitionTable(tableInfo, true);
}
if (tableInfo.getTableConfig().isDataLakeEnabled()) {
lakeTableTieringManager.addNewLakeTable(tableInfo);
}
+
+ if (!tableInfo.isPartitioned()) {
+ Set tableBuckets = new HashSet<>();
+ tableAssignment
+ .getBucketAssignments()
+ .keySet()
+ .forEach(bucketId -> tableBuckets.add(new TableBucket(tableId, bucketId)));
+ updateTabletServerMetadataCache(
+ new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
+ null,
+ null,
+ tableBuckets);
+ } else {
+ updateTabletServerMetadataCache(
+ new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
+ tableId,
+ null,
+ Collections.emptySet());
+ }
}
private void processCreatePartition(CreatePartitionEvent createPartitionEvent) {
@@ -584,33 +578,54 @@ private void processCreatePartition(CreatePartitionEvent createPartitionEvent) {
long tableId = createPartitionEvent.getTableId();
String partitionName = createPartitionEvent.getPartitionName();
+ PartitionAssignment partitionAssignment = createPartitionEvent.getPartitionAssignment();
tableManager.onCreateNewPartition(
createPartitionEvent.getTablePath(),
tableId,
createPartitionEvent.getPartitionId(),
partitionName,
- createPartitionEvent.getPartitionAssignment());
+ partitionAssignment);
autoPartitionManager.addPartition(tableId, partitionName);
+
+ Set tableBuckets = new HashSet<>();
+ partitionAssignment
+ .getBucketAssignments()
+ .keySet()
+ .forEach(
+ bucketId ->
+ tableBuckets.add(new TableBucket(tableId, partitionId, bucketId)));
+ updateTabletServerMetadataCache(
+ new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
+ null,
+ null,
+ tableBuckets);
}
private void processDropTable(DropTableEvent dropTableEvent) {
// If this is a primary key table, drop the kv snapshot store.
- TableInfo dropTableInfo = coordinatorContext.getTableInfoById(dropTableEvent.getTableId());
+ long tableId = dropTableEvent.getTableId();
+ TableInfo dropTableInfo = coordinatorContext.getTableInfoById(tableId);
if (dropTableInfo.hasPrimaryKey()) {
- Set deleteTableBuckets =
- coordinatorContext.getAllBucketsForTable(dropTableEvent.getTableId());
+ Set deleteTableBuckets = coordinatorContext.getAllBucketsForTable(tableId);
completedSnapshotStoreManager.removeCompletedSnapshotStoreByTableBuckets(
deleteTableBuckets);
}
- coordinatorContext.queueTableDeletion(Collections.singleton(dropTableEvent.getTableId()));
- tableManager.onDeleteTable(dropTableEvent.getTableId());
+ coordinatorContext.queueTableDeletion(Collections.singleton(tableId));
+ tableManager.onDeleteTable(tableId);
if (dropTableEvent.isAutoPartitionTable()) {
- autoPartitionManager.removeAutoPartitionTable(dropTableEvent.getTableId());
+ autoPartitionManager.removeAutoPartitionTable(tableId);
}
if (dropTableEvent.isDataLakeEnabled()) {
- lakeTableTieringManager.removeLakeTable(dropTableEvent.getTableId());
+ lakeTableTieringManager.removeLakeTable(tableId);
}
+
+ // send update metadata request.
+ updateTabletServerMetadataCache(
+ new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
+ tableId,
+ null,
+ Collections.emptySet());
}
private void processDropPartition(DropPartitionEvent dropPartitionEvent) {
@@ -631,6 +646,13 @@ private void processDropPartition(DropPartitionEvent dropPartitionEvent) {
coordinatorContext.queuePartitionDeletion(Collections.singleton(tablePartition));
tableManager.onDeletePartition(tableId, dropPartitionEvent.getPartitionId());
autoPartitionManager.removePartition(tableId, dropPartitionEvent.getPartitionName());
+
+ // send update metadata request.
+ updateTabletServerMetadataCache(
+ new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
+ tableId,
+ tablePartition.getPartitionId(),
+ Collections.emptySet());
}
private void processDeleteReplicaResponseReceived(
@@ -764,10 +786,22 @@ private void processNewTabletServer(NewTabletServerEvent newTabletServerEvent) {
ServerNode serverNode = serverInfo.nodeOrThrow(internalListenerName);
coordinatorChannelManager.addTabletServer(serverNode);
- // update server metadata cache.
- updateServerMetadataCache(
- Optional.ofNullable(coordinatorServerInfo),
+ // update coordinatorServer metadata cache for the new added table server.
+ serverMetadataCache.updateMetadata(
+ coordinatorContext.getCoordinatorServerInfo(),
new HashSet<>(coordinatorContext.getLiveTabletServers().values()));
+ // update server info for all tablet servers.
+ updateTabletServerMetadataCache(
+ new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
+ null,
+ null,
+ Collections.emptySet());
+ // update table info for the new added table server.
+ updateTabletServerMetadataCache(
+ Collections.singleton(serverInfo),
+ null,
+ null,
+ coordinatorContext.bucketLeaderAndIsr().keySet());
// when a new tablet server comes up, we need to get all replicas of the server
// and transmit them to online
@@ -803,10 +837,6 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent
coordinatorContext.removeLiveTabletServer(tabletServerId);
coordinatorChannelManager.removeTabletServer(tabletServerId);
- updateServerMetadataCache(
- Optional.ofNullable(coordinatorServerInfo),
- new HashSet<>(coordinatorContext.getLiveTabletServers().values()));
-
TableBucketStateMachine tableBucketStateMachine = tableManager.getTableBucketStateMachine();
// get all table bucket whose leader is in this server and it not to be deleted
Set bucketsWithOfflineLeader =
@@ -834,6 +864,14 @@ private void processDeadTabletServer(DeadTabletServerEvent deadTabletServerEvent
// trigger OfflineReplica state change for those newly offline replicas
replicaStateMachine.handleStateChanges(replicas, OfflineReplica);
+
+ Set serverInfos =
+ new HashSet<>(coordinatorContext.getLiveTabletServers().values());
+ // update coordinatorServer metadata cache.
+ serverMetadataCache.updateMetadata(
+ coordinatorContext.getCoordinatorServerInfo(), serverInfos);
+ // update tabletServer metadata cache by send updateMetadata request.
+ updateTabletServerMetadataCache(serverInfos, null, null, bucketsWithOfflineLeader);
}
private List tryProcessAdjustIsr(
@@ -1098,20 +1136,53 @@ private void validateFencedEvent(FencedCoordinatorEvent event) {
}
}
- /** Update metadata cache for coordinator server and all remote tablet servers. */
- @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
- private void updateServerMetadataCache(
- Optional coordinatorServer, Set aliveTabletServers) {
- // 1. update local metadata cache.
- serverMetadataCache.updateClusterMetadata(
- new ClusterMetadataInfo(coordinatorServer, aliveTabletServers));
+ /** Update metadata cache for all remote tablet servers when coordinator startup. */
+ private void updateTabletServerMetadataCacheWhenStartup(Set aliveTabletServers) {
+ coordinatorRequestBatch.newBatch();
+ Set serverIds =
+ aliveTabletServers.stream().map(ServerInfo::id).collect(Collectors.toSet());
+
+ Set tablesToBeDeleted = coordinatorContext.getTablesToBeDeleted();
+ tablesToBeDeleted.forEach(
+ tableId ->
+ coordinatorRequestBatch.addUpdateMetadataRequestForTabletServers(
+ serverIds, tableId, null, Collections.emptySet()));
+
+ Set partitionsToBeDeleted = coordinatorContext.getPartitionsToBeDeleted();
+ partitionsToBeDeleted.forEach(
+ tablePartition ->
+ coordinatorRequestBatch.addUpdateMetadataRequestForTabletServers(
+ serverIds,
+ tablePartition.getTableId(),
+ tablePartition.getPartitionId(),
+ Collections.emptySet()));
+
+ Set tableBuckets = new HashSet<>();
+ coordinatorContext
+ .bucketLeaderAndIsr()
+ .forEach(
+ (tableBucket, leaderAndIsr) -> {
+ if (!coordinatorContext.isToBeDeleted(tableBucket)) {
+ tableBuckets.add(tableBucket);
+ }
+ });
+ coordinatorRequestBatch.addUpdateMetadataRequestForTabletServers(
+ serverIds, null, null, tableBuckets);
+
+ coordinatorRequestBatch.sendUpdateMetadataRequest();
+ }
- // 2. send update metadata request to all alive tablet servers
+ /** Update metadata cache for all remote tablet servers. */
+ private void updateTabletServerMetadataCache(
+ Set aliveTabletServers,
+ @Nullable Long tableId,
+ @Nullable Long partitionId,
+ Set tableBuckets) {
coordinatorRequestBatch.newBatch();
Set serverIds =
aliveTabletServers.stream().map(ServerInfo::id).collect(Collectors.toSet());
coordinatorRequestBatch.addUpdateMetadataRequestForTabletServers(
- serverIds, coordinatorServer, aliveTabletServers);
+ serverIds, tableId, partitionId, tableBuckets);
coordinatorRequestBatch.sendUpdateMetadataRequest();
}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorRequestBatch.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorRequestBatch.java
index c3100c0147..2ed4a34ae0 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorRequestBatch.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorRequestBatch.java
@@ -17,8 +17,12 @@
package com.alibaba.fluss.server.coordinator;
import com.alibaba.fluss.metadata.PhysicalTablePath;
+import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableBucketReplica;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TableInfo;
+import com.alibaba.fluss.metadata.TablePartition;
import com.alibaba.fluss.rpc.messages.NotifyKvSnapshotOffsetRequest;
import com.alibaba.fluss.rpc.messages.NotifyLakeTableOffsetRequest;
import com.alibaba.fluss.rpc.messages.NotifyLeaderAndIsrRequest;
@@ -35,21 +39,31 @@
import com.alibaba.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
import com.alibaba.fluss.server.entity.DeleteReplicaResultForBucket;
import com.alibaba.fluss.server.entity.NotifyLeaderAndIsrData;
-import com.alibaba.fluss.server.metadata.ServerInfo;
+import com.alibaba.fluss.server.metadata.BucketMetadata;
+import com.alibaba.fluss.server.metadata.PartitionMetadata;
+import com.alibaba.fluss.server.metadata.TableMetadata;
import com.alibaba.fluss.server.zk.data.LakeTableSnapshot;
import com.alibaba.fluss.server.zk.data.LeaderAndIsr;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import static com.alibaba.fluss.server.metadata.PartitionMetadata.DELETED_PARTITION_ID;
+import static com.alibaba.fluss.server.metadata.PartitionMetadata.DELETED_PARTITION_NAME;
+import static com.alibaba.fluss.server.metadata.TableMetadata.DELETED_TABLE_ID;
+import static com.alibaba.fluss.server.metadata.TableMetadata.DELETED_TABLE_PATH;
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getNotifyLeaderAndIsrResponseData;
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeNotifyBucketLeaderAndIsr;
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeNotifyKvSnapshotOffsetRequest;
@@ -65,15 +79,25 @@ public class CoordinatorRequestBatch {
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorRequestBatch.class);
+ private static final Schema EMPTY_SCHEMA = Schema.newBuilder().build();
+ private static final TableDescriptor EMPTY_TABLE_DESCRIPTOR =
+ TableDescriptor.builder().schema(EMPTY_SCHEMA).distributedBy(0).build();
+
// a map from tablet server to notify the leader and isr for each bucket.
private final Map>
notifyLeaderAndIsrRequestMap = new HashMap<>();
// a map from tablet server to stop replica for each bucket.
private final Map> stopReplicaRequestMap =
new HashMap<>();
- // a map from tablet server to update metadata request
- private final Map updateMetadataRequestTabletServerSet =
+
+ // a set of tabletServers to send update metadata request.
+ private final Set updateMetadataRequestTabletServerSet = new HashSet<>();
+ // a map from tableId to bucket metadata to update.
+ private final Map> updateMetadataRequestBucketMap = new HashMap<>();
+ // a map from tableId to (a map from partitionId to bucket metadata) to update.
+ private final Map>> updateMetadataRequestPartitionMap =
new HashMap<>();
+
// a map from tablet server to notify remote log offsets request.
private final Map notifyRemoteLogOffsetsRequestMap =
new HashMap<>();
@@ -86,11 +110,15 @@ public class CoordinatorRequestBatch {
private final CoordinatorChannelManager coordinatorChannelManager;
private final EventManager eventManager;
+ private final CoordinatorContext coordinatorContext;
public CoordinatorRequestBatch(
- CoordinatorChannelManager coordinatorChannelManager, EventManager eventManager) {
+ CoordinatorChannelManager coordinatorChannelManager,
+ EventManager eventManager,
+ CoordinatorContext coordinatorContext) {
this.coordinatorChannelManager = coordinatorChannelManager;
this.eventManager = eventManager;
+ this.coordinatorContext = coordinatorContext;
}
public void newBatch() {
@@ -198,6 +226,14 @@ public void addNotifyLeaderRequestForTabletServers(
leaderAndIsr));
notifyBucketLeaderAndIsr.put(tableBucket, notifyLeaderAndIsrForBucket);
});
+
+ // TODO for these cases, we can send NotifyLeaderAndIsrRequest instead of another
+ // updateMetadata request, trace by: https://github.com/alibaba/fluss/issues/983
+ addUpdateMetadataRequestForTabletServers(
+ coordinatorContext.getLiveTabletServers().keySet(),
+ null,
+ null,
+ Collections.singleton(tableBucket));
}
public void addStopReplicaRequestForTabletServers(
@@ -223,19 +259,87 @@ public void addStopReplicaRequestForTabletServers(
});
}
- @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+ /**
+ * Add updateMetadata request for tabletServers when these cases happen:
+ *
+ *
+ * - case1: coordinatorServer re-start to re-initial coordinatorContext
+ *
- case2: Table create and bucketAssignment generated, case will happen for new created
+ * none-partitioned table
+ *
- case3: Table create and bucketAssignment don't generated, case will happen for new
+ * created partitioned table
+ *
- case4: Table is queued for deletion, in this case we will set a empty tableBucket set
+ * and tableId set to {@link TableMetadata#DELETED_TABLE_ID} to avoid send unless info to
+ * tabletServer
+ *
- case5: Partition create and bucketAssignment of this partition generated.
+ *
- case6: Partition is queued for deletion, in this case we will set a empty tableBucket
+ * set and partitionId set to {@link PartitionMetadata#DELETED_PARTITION_ID } to avoid
+ * send unless info to tabletServer
+ *
- case7: Leader and isr is changed for these input tableBuckets
+ *
- case8: One newly tabletServer added into cluster
+ *
- case9: One tabletServer is removed from cluster
+ *
+ */
public void addUpdateMetadataRequestForTabletServers(
Set tabletServers,
- Optional coordinatorServer,
- Set aliveTabletServers) {
+ @Nullable Long tableId,
+ @Nullable Long partitionId,
+ Set tableBuckets) {
+ // case9:
tabletServers.stream()
.filter(s -> s >= 0)
- .forEach(
- id ->
- updateMetadataRequestTabletServerSet.put(
- id,
- makeUpdateMetadataRequest(
- coordinatorServer, aliveTabletServers)));
+ .forEach(updateMetadataRequestTabletServerSet::add);
+
+ if (tableId != null) {
+ if (partitionId != null) {
+ // case6
+ updateMetadataRequestPartitionMap
+ .computeIfAbsent(tableId, k -> new HashMap<>())
+ .put(partitionId, Collections.emptyList());
+ } else {
+ // case3, case4
+ updateMetadataRequestBucketMap.put(tableId, Collections.emptyList());
+ }
+ } else {
+ // case1, case2, case5, case7, case8
+ for (TableBucket tableBucket : tableBuckets) {
+ long currentTableId = tableBucket.getTableId();
+ Long currentPartitionId = tableBucket.getPartitionId();
+ Optional bucketLeaderAndIsr =
+ coordinatorContext.getBucketLeaderAndIsr(tableBucket);
+ Integer leaderEpoch =
+ bucketLeaderAndIsr.map(LeaderAndIsr::leaderEpoch).orElse(null);
+ Integer leader = bucketLeaderAndIsr.map(LeaderAndIsr::leader).orElse(null);
+ if (currentPartitionId == null) {
+ Map> tableAssignment =
+ coordinatorContext.getTableAssignment(currentTableId);
+ BucketMetadata bucketMetadata =
+ new BucketMetadata(
+ tableBucket.getBucket(),
+ leader,
+ leaderEpoch,
+ tableAssignment.get(tableBucket.getBucket()));
+ updateMetadataRequestBucketMap
+ .computeIfAbsent(currentTableId, k -> new ArrayList<>())
+ .add(bucketMetadata);
+ } else {
+ TablePartition tablePartition =
+ new TablePartition(currentTableId, currentPartitionId);
+ Map> partitionAssignment =
+ coordinatorContext.getPartitionAssignment(tablePartition);
+ BucketMetadata bucketMetadata =
+ new BucketMetadata(
+ tableBucket.getBucket(),
+ leader,
+ leaderEpoch,
+ partitionAssignment.get(tableBucket.getBucket()));
+ updateMetadataRequestPartitionMap
+ .computeIfAbsent(currentTableId, k -> new HashMap<>())
+ .computeIfAbsent(tableBucket.getPartitionId(), k -> new ArrayList<>())
+ .add(bucketMetadata);
+ }
+ }
+ }
}
public void addNotifyRemoteLogOffsetsRequestForTabletServers(
@@ -419,10 +523,9 @@ private void sendStopRequest(int coordinatorEpoch) {
}
public void sendUpdateMetadataRequest() {
- for (Map.Entry updateMetadataRequestEntry :
- updateMetadataRequestTabletServerSet.entrySet()) {
- Integer serverId = updateMetadataRequestEntry.getKey();
- UpdateMetadataRequest updateMetadataRequest = updateMetadataRequestEntry.getValue();
+ // Build updateMetadataRequest.
+ UpdateMetadataRequest updateMetadataRequest = buildUpdateMetadataRequest();
+ for (Integer serverId : updateMetadataRequestTabletServerSet) {
coordinatorChannelManager.sendUpdateMetadataRequest(
serverId,
updateMetadataRequest,
@@ -435,6 +538,8 @@ public void sendUpdateMetadataRequest() {
});
}
updateMetadataRequestTabletServerSet.clear();
+ updateMetadataRequestBucketMap.clear();
+ updateMetadataRequestPartitionMap.clear();
}
public void sendNotifyRemoteLogOffsetsRequest(int coordinatorEpoch) {
@@ -505,4 +610,88 @@ public void sendNotifyLakeTableOffsetRequest(int coordinatorEpoch) {
}
notifyLakeTableOffsetRequestMap.clear();
}
+
+ private UpdateMetadataRequest buildUpdateMetadataRequest() {
+ List tableMetadataList = new ArrayList<>();
+ updateMetadataRequestBucketMap.forEach(
+ (tableId, bucketMetadataList) -> {
+ TableInfo tableInfo = coordinatorContext.getTableInfoById(tableId);
+ boolean tableQueuedForDeletion =
+ coordinatorContext.isTableQueuedForDeletion(tableId);
+ TableInfo newTableInfo;
+ if (tableInfo == null) {
+ if (tableQueuedForDeletion) {
+ newTableInfo =
+ TableInfo.of(
+ DELETED_TABLE_PATH,
+ tableId,
+ 0,
+ EMPTY_TABLE_DESCRIPTOR,
+ -1L,
+ -1L);
+ } else {
+ throw new IllegalStateException(
+ "Table info is null for table " + tableId);
+ }
+ } else {
+ newTableInfo =
+ tableQueuedForDeletion
+ ? TableInfo.of(
+ tableInfo.getTablePath(),
+ DELETED_TABLE_ID,
+ 0,
+ EMPTY_TABLE_DESCRIPTOR,
+ -1L,
+ -1L)
+ : tableInfo;
+ }
+
+ tableMetadataList.add(new TableMetadata(newTableInfo, bucketMetadataList));
+ });
+
+ List partitionMetadataList = new ArrayList<>();
+ updateMetadataRequestPartitionMap.forEach(
+ (tableId, partitionIdToBucketMetadataMap) -> {
+ for (Map.Entry> kvEntry :
+ partitionIdToBucketMetadataMap.entrySet()) {
+ Long partitionId = kvEntry.getKey();
+ boolean partitionQueuedForDeletion =
+ coordinatorContext.isPartitionQueuedForDeletion(
+ new TablePartition(tableId, partitionId));
+ String partitionName = coordinatorContext.getPartitionName(partitionId);
+ PartitionMetadata partitionMetadata;
+ if (partitionName == null) {
+ if (partitionQueuedForDeletion) {
+ partitionMetadata =
+ new PartitionMetadata(
+ tableId,
+ DELETED_PARTITION_NAME,
+ partitionId,
+ kvEntry.getValue());
+ } else {
+ throw new IllegalStateException(
+ "Partition name is null for partition " + partitionId);
+ }
+ } else {
+ partitionMetadata =
+ new PartitionMetadata(
+ tableId,
+ partitionName,
+ partitionQueuedForDeletion
+ ? DELETED_PARTITION_ID
+ : partitionId,
+ kvEntry.getValue());
+ }
+ partitionMetadataList.add(partitionMetadata);
+ }
+ });
+
+ // TODO Todo Distinguish which tablet servers need to be updated instead of sending all live
+ // tablet servers.
+ return makeUpdateMetadataRequest(
+ coordinatorContext.getCoordinatorServerInfo(),
+ new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
+ tableMetadataList,
+ partitionMetadataList);
+ }
}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
index a80c6e6c42..d9e0e55c17 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
@@ -35,8 +35,8 @@
import com.alibaba.fluss.server.ServerBase;
import com.alibaba.fluss.server.authorizer.Authorizer;
import com.alibaba.fluss.server.authorizer.AuthorizerLoader;
+import com.alibaba.fluss.server.metadata.CoordinatorMetadataCache;
import com.alibaba.fluss.server.metadata.ServerMetadataCache;
-import com.alibaba.fluss.server.metadata.ServerMetadataCacheImpl;
import com.alibaba.fluss.server.metrics.ServerMetricUtils;
import com.alibaba.fluss.server.metrics.group.CoordinatorMetricGroup;
import com.alibaba.fluss.server.zk.ZooKeeperClient;
@@ -112,7 +112,7 @@ public class CoordinatorServer extends ServerBase {
private CoordinatorService coordinatorService;
@GuardedBy("lock")
- private ServerMetadataCache metadataCache;
+ private CoordinatorMetadataCache metadataCache;
@GuardedBy("lock")
private CoordinatorChannelManager coordinatorChannelManager;
@@ -136,6 +136,9 @@ public class CoordinatorServer extends ServerBase {
@Nullable
private Authorizer authorizer;
+ @GuardedBy("lock")
+ private CoordinatorContext coordinatorContext;
+
public CoordinatorServer(Configuration conf) {
super(conf);
validateConfigs(conf);
@@ -167,7 +170,8 @@ protected void startServices() throws Exception {
this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this);
- this.metadataCache = new ServerMetadataCacheImpl();
+ this.coordinatorContext = new CoordinatorContext();
+ this.metadataCache = new CoordinatorMetadataCache();
this.authorizer = AuthorizerLoader.createAuthorizer(conf, zkClient, pluginManager);
if (authorizer != null) {
@@ -225,6 +229,7 @@ protected void startServices() throws Exception {
zkClient,
metadataCache,
coordinatorChannelManager,
+ coordinatorContext,
autoPartitionManager,
lakeTableTieringManager,
serverMetricGroup,
@@ -372,6 +377,15 @@ CompletableFuture stopServices() {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}
+ try {
+ if (coordinatorContext != null) {
+ // then reset coordinatorContext
+ coordinatorContext.resetContext();
+ }
+ } catch (Throwable t) {
+ exception = ExceptionUtils.firstOrSuppressed(t, exception);
+ }
+
try {
if (lakeTableTieringManager != null) {
lakeTableTieringManager.close();
@@ -435,6 +449,11 @@ public RpcServer getRpcServer() {
return rpcServer;
}
+ @VisibleForTesting
+ public ServerMetadataCache getMetadataCache() {
+ return metadataCache;
+ }
+
private static void validateConfigs(Configuration conf) {
if (conf.get(ConfigOptions.DEFAULT_REPLICATION_FACTOR) < 1) {
throw new IllegalConfigurationException(
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/DefaultCompletedKvSnapshotCommitter.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/DefaultCompletedKvSnapshotCommitter.java
index f5123bfffe..415cf54315 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/DefaultCompletedKvSnapshotCommitter.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/snapshot/DefaultCompletedKvSnapshotCommitter.java
@@ -16,17 +16,16 @@
package com.alibaba.fluss.server.kv.snapshot;
-import com.alibaba.fluss.cluster.MetadataCache;
import com.alibaba.fluss.cluster.ServerNode;
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.rpc.GatewayClientProxy;
import com.alibaba.fluss.rpc.RpcClient;
import com.alibaba.fluss.rpc.gateway.CoordinatorGateway;
+import com.alibaba.fluss.server.metadata.ServerMetadataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Optional;
import java.util.function.Supplier;
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeCommitKvSnapshotRequest;
@@ -47,7 +46,7 @@ public DefaultCompletedKvSnapshotCommitter(CoordinatorGateway coordinatorGateway
}
public static DefaultCompletedKvSnapshotCommitter create(
- RpcClient rpcClient, MetadataCache metadataCache, String interListenerName) {
+ RpcClient rpcClient, ServerMetadataCache metadataCache, String interListenerName) {
CoordinatorServerSupplier coordinatorServerSupplier =
new CoordinatorServerSupplier(metadataCache, interListenerName);
return new DefaultCompletedKvSnapshotCommitter(
@@ -69,18 +68,19 @@ private static class CoordinatorServerSupplier implements Supplier {
private static final int BACK_OFF_MILLS = 500;
- private final MetadataCache metadataCache;
+ private final ServerMetadataCache metadataCache;
private final String interListenerName;
- public CoordinatorServerSupplier(MetadataCache metadataCache, String interListenerName) {
+ public CoordinatorServerSupplier(
+ ServerMetadataCache metadataCache, String interListenerName) {
this.metadataCache = metadataCache;
this.interListenerName = interListenerName;
}
@Override
public ServerNode get() {
- Optional serverNode = metadataCache.getCoordinatorServer(interListenerName);
- if (!serverNode.isPresent()) {
+ ServerNode serverNode = metadataCache.getCoordinatorServer(interListenerName);
+ if (serverNode == null) {
LOG.info("No coordinator provided, retrying after backoff.");
// backoff some times
try {
@@ -92,7 +92,7 @@ public ServerNode get() {
}
return get();
}
- return serverNode.get();
+ return serverNode;
}
}
}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/AbstractServerMetadataCache.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/AbstractServerMetadataCache.java
deleted file mode 100644
index 2df0eb618e..0000000000
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/AbstractServerMetadataCache.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Copyright (c) 2025 Alibaba Group Holding Ltd.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.server.metadata;
-
-import com.alibaba.fluss.cluster.ServerNode;
-import com.alibaba.fluss.cluster.TabletServerInfo;
-import com.alibaba.fluss.metadata.PhysicalTablePath;
-import com.alibaba.fluss.server.coordinator.CoordinatorServer;
-import com.alibaba.fluss.server.tablet.TabletServer;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-/**
- * The abstract server metadata cache to cache the cluster metadata info. This cache is updated
- * through UpdateMetadataRequest from the {@link CoordinatorServer}. {@link CoordinatorServer} and
- * each {@link TabletServer} maintains the same cache, asynchronously.
- */
-public abstract class AbstractServerMetadataCache implements ServerMetadataCache {
-
- /**
- * This is cache state. every Cluster instance is immutable, and updates (performed under a
- * lock) replace the value with a completely new one. this means reads (which are not under any
- * lock) need to grab the value of this ONCE and retain that read copy for the duration of their
- * operation.
- *
- * multiple reads of this value risk getting different snapshots.
- */
- protected volatile ServerCluster clusterMetadata;
-
- protected volatile Map tableBucketMetadata;
-
- public AbstractServerMetadataCache() {
- // no coordinator server address while creating.
- this.clusterMetadata = ServerCluster.empty();
- this.tableBucketMetadata = new HashMap<>();
- }
-
- @Override
- public boolean isAliveTabletServer(int serverId) {
- Set aliveTabletServersById = clusterMetadata.getAliveTabletServerInfos();
- for (TabletServerInfo tabletServerInfo : aliveTabletServersById) {
- if (tabletServerInfo.getId() == serverId) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public Optional getTabletServer(int serverId, String listenerName) {
- return clusterMetadata.getAliveTabletServersById(serverId, listenerName);
- }
-
- @Override
- public Map getAllAliveTabletServers(String listenerName) {
- return clusterMetadata.getAliveTabletServers(listenerName);
- }
-
- @Override
- public Optional getCoordinatorServer(String listenerName) {
- return clusterMetadata.getCoordinatorServer(listenerName);
- }
-
- @Override
- public Set getAliveTabletServerInfos() {
- return clusterMetadata.getAliveTabletServerInfos();
- }
-
- @Override
- public PhysicalTablePath getTablePath(long tableId) {
- return tableBucketMetadata.get(tableId);
- }
-}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/BucketMetadata.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/BucketMetadata.java
new file mode 100644
index 0000000000..83ae32d054
--- /dev/null
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/BucketMetadata.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.server.metadata;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.OptionalInt;
+
+/** This entity used to describe the bucket metadata. */
+public class BucketMetadata {
+ private final int bucketId;
+ private final @Nullable Integer leaderId;
+ private final @Nullable Integer leaderEpoch;
+ private final List replicas;
+
+ public BucketMetadata(
+ int bucketId,
+ @Nullable Integer leaderId,
+ @Nullable Integer leaderEpoch,
+ List replicas) {
+ this.bucketId = bucketId;
+ this.leaderId = leaderId;
+ this.leaderEpoch = leaderEpoch;
+ this.replicas = replicas;
+ }
+
+ public int getBucketId() {
+ return bucketId;
+ }
+
+ public OptionalInt getLeaderId() {
+ return leaderId == null ? OptionalInt.empty() : OptionalInt.of(leaderId);
+ }
+
+ public OptionalInt getLeaderEpoch() {
+ return leaderEpoch == null ? OptionalInt.empty() : OptionalInt.of(leaderEpoch);
+ }
+
+ public List getReplicas() {
+ return replicas;
+ }
+}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ClusterMetadata.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ClusterMetadata.java
new file mode 100644
index 0000000000..c41c84d25e
--- /dev/null
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ClusterMetadata.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.server.metadata;
+
+import com.alibaba.fluss.annotation.VisibleForTesting;
+import com.alibaba.fluss.rpc.messages.MetadataResponse;
+import com.alibaba.fluss.rpc.messages.UpdateMetadataRequest;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This entity used to describe the cluster metadata, including coordinator server address, alive
+ * tablets servers and {@link TableMetadata} list or {@link PartitionMetadata}, which can be used to
+ * build {@link MetadataResponse} or convert from {@link UpdateMetadataRequest}.
+ */
+public class ClusterMetadata {
+
+ private final @Nullable ServerInfo coordinatorServer;
+ private final Set aliveTabletServers;
+ private final List tableMetadataList;
+ private final List partitionMetadataList;
+
+ @VisibleForTesting
+ public ClusterMetadata(
+ @Nullable ServerInfo coordinatorServer, Set aliveTabletServers) {
+ this(
+ coordinatorServer,
+ aliveTabletServers,
+ Collections.emptyList(),
+ Collections.emptyList());
+ }
+
+ public ClusterMetadata(
+ @Nullable ServerInfo coordinatorServer,
+ Set aliveTabletServers,
+ List tableMetadataList,
+ List partitionMetadataList) {
+ this.coordinatorServer = coordinatorServer;
+ this.aliveTabletServers = aliveTabletServers;
+ this.tableMetadataList = tableMetadataList;
+ this.partitionMetadataList = partitionMetadataList;
+ }
+
+ public @Nullable ServerInfo getCoordinatorServer() {
+ return coordinatorServer;
+ }
+
+ public Set getAliveTabletServers() {
+ return aliveTabletServers;
+ }
+
+ public List getTableMetadataList() {
+ return tableMetadataList;
+ }
+
+ public List getPartitionMetadataList() {
+ return partitionMetadataList;
+ }
+}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ClusterMetadataInfo.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ClusterMetadataInfo.java
deleted file mode 100644
index 8fa5567387..0000000000
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ClusterMetadataInfo.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Copyright (c) 2025 Alibaba Group Holding Ltd.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.server.metadata;
-
-import com.alibaba.fluss.cluster.BucketLocation;
-import com.alibaba.fluss.cluster.Endpoint;
-import com.alibaba.fluss.cluster.ServerNode;
-import com.alibaba.fluss.cluster.ServerType;
-import com.alibaba.fluss.config.ConfigOptions;
-import com.alibaba.fluss.metadata.TableInfo;
-import com.alibaba.fluss.metadata.TablePath;
-import com.alibaba.fluss.rpc.messages.MetadataResponse;
-import com.alibaba.fluss.rpc.messages.PbBucketMetadata;
-import com.alibaba.fluss.rpc.messages.PbPartitionMetadata;
-import com.alibaba.fluss.rpc.messages.PbServerNode;
-import com.alibaba.fluss.rpc.messages.PbTableMetadata;
-import com.alibaba.fluss.rpc.messages.UpdateMetadataRequest;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-
-/**
- * This entity used to describe the cluster metadata info, including coordinator server address,
- * alive tablets servers and {@link TableMetadataInfo} list, which can be used to build {@link
- * MetadataResponse} or convert from {@link UpdateMetadataRequest}.
- */
-public class ClusterMetadataInfo {
- @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
- private final Optional coordinatorServer;
-
- private final Set aliveTabletServers;
- private final List tableMetadataInfos;
- private final List partitionMetadataInfos;
-
- @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
- public ClusterMetadataInfo(
- Optional coordinatorServer, Set aliveTabletServers) {
- this(
- coordinatorServer,
- aliveTabletServers,
- Collections.emptyList(),
- Collections.emptyList());
- }
-
- @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
- public ClusterMetadataInfo(
- Optional coordinatorServer,
- Set aliveTabletServers,
- List tableMetadataInfos,
- List partitionMetadataInfos) {
- this.coordinatorServer = coordinatorServer;
- this.aliveTabletServers = aliveTabletServers;
- this.tableMetadataInfos = tableMetadataInfos;
- this.partitionMetadataInfos = partitionMetadataInfos;
- }
-
- public Optional getCoordinatorServer() {
- return coordinatorServer;
- }
-
- public Set getAliveTabletServers() {
- return aliveTabletServers;
- }
-
- public static MetadataResponse toMetadataResponse(
- Optional coordinatorServer,
- Set aliveTabletServers,
- List tableMetadataInfos,
- List partitionMetadataInfos) {
- MetadataResponse metadataResponse = new MetadataResponse();
-
- if (coordinatorServer.isPresent()) {
- ServerNode server = coordinatorServer.get();
- metadataResponse
- .setCoordinatorServer()
- .setNodeId(server.id())
- .setHost(server.host())
- .setPort(server.port());
- }
-
- List serverNodeList = new ArrayList<>();
- for (ServerNode serverNode : aliveTabletServers) {
- PbServerNode tabletServerNode =
- new PbServerNode()
- .setNodeId(serverNode.id())
- .setHost(serverNode.host())
- .setPort(serverNode.port());
- if (serverNode.rack() != null) {
- tabletServerNode.setRack(serverNode.rack());
- }
- serverNodeList.add(tabletServerNode);
- }
-
- List tableMetadatas = new ArrayList<>();
- for (TableMetadataInfo tableMetadataInfo : tableMetadataInfos) {
- TableInfo tableInfo = tableMetadataInfo.getTableInfo();
- PbTableMetadata tableMetadata =
- new PbTableMetadata()
- .setTableId(tableInfo.getTableId())
- .setSchemaId(tableInfo.getSchemaId())
- .setTableJson(tableInfo.toTableDescriptor().toJsonBytes())
- .setCreatedTime(tableInfo.getCreatedTime())
- .setModifiedTime(tableInfo.getModifiedTime());
- TablePath tablePath = tableInfo.getTablePath();
- tableMetadata
- .setTablePath()
- .setDatabaseName(tablePath.getDatabaseName())
- .setTableName(tablePath.getTableName());
- tableMetadata.addAllBucketMetadatas(
- toPbTableBucketMetadata(tableMetadataInfo.getBucketLocations()));
-
- tableMetadatas.add(tableMetadata);
- }
-
- List partitionMetadatas = new ArrayList<>();
- for (PartitionMetadataInfo partitionMetadataInfo : partitionMetadataInfos) {
- PbPartitionMetadata pbPartitionMetadata =
- new PbPartitionMetadata()
- .setTableId(partitionMetadataInfo.getTableId())
- .setPartitionId(partitionMetadataInfo.getPartitionId())
- .setPartitionName(partitionMetadataInfo.getPartitionName());
- pbPartitionMetadata.addAllBucketMetadatas(
- toPbTableBucketMetadata(partitionMetadataInfo.getBucketLocations()));
- partitionMetadatas.add(pbPartitionMetadata);
- }
-
- metadataResponse.addAllTabletServers(serverNodeList);
- metadataResponse.addAllTableMetadatas(tableMetadatas);
- metadataResponse.addAllPartitionMetadatas(partitionMetadatas);
- return metadataResponse;
- }
-
- private static List toPbTableBucketMetadata(
- List bucketLocations) {
- List bucketMetadata = new ArrayList<>();
- for (BucketLocation bucketLocation : bucketLocations) {
- PbBucketMetadata tableBucketMetadata =
- new PbBucketMetadata().setBucketId(bucketLocation.getBucketId());
- if (bucketLocation.getLeader() != null) {
- tableBucketMetadata.setLeaderId(bucketLocation.getLeader().id());
- }
-
- for (ServerNode replica : bucketLocation.getReplicas()) {
- tableBucketMetadata.addReplicaId(replica.id());
- }
-
- bucketMetadata.add(tableBucketMetadata);
- }
- return bucketMetadata;
- }
-
- public static ClusterMetadataInfo fromUpdateMetadataRequest(UpdateMetadataRequest request) {
- Optional coordinatorServer = Optional.empty();
- if (request.hasCoordinatorServer()) {
- PbServerNode pbCoordinatorServer = request.getCoordinatorServer();
- List endpoints =
- pbCoordinatorServer.hasListeners()
- ? Endpoint.fromListenersString(pbCoordinatorServer.getListeners())
- // backward compatible with old version that doesn't have listeners
- : Collections.singletonList(
- new Endpoint(
- pbCoordinatorServer.getHost(),
- pbCoordinatorServer.getPort(),
- // TODO: maybe use internal listener name from conf
- ConfigOptions.INTERNAL_LISTENER_NAME.defaultValue()));
- coordinatorServer =
- Optional.of(
- new ServerInfo(
- pbCoordinatorServer.getNodeId(),
- null,
- endpoints,
- ServerType.COORDINATOR));
- }
-
- Set aliveTabletServers = new HashSet<>();
- for (PbServerNode tabletServer : request.getTabletServersList()) {
- List endpoints =
- tabletServer.hasListeners()
- ? Endpoint.fromListenersString(tabletServer.getListeners())
- // backward compatible with old version that doesn't have listeners
- : Collections.singletonList(
- new Endpoint(
- tabletServer.getHost(),
- tabletServer.getPort(),
- // TODO: maybe use internal listener name from conf
- ConfigOptions.INTERNAL_LISTENER_NAME.defaultValue()));
-
- aliveTabletServers.add(
- new ServerInfo(
- tabletServer.getNodeId(),
- tabletServer.hasRack() ? tabletServer.getRack() : null,
- endpoints,
- ServerType.TABLET_SERVER));
- }
- return new ClusterMetadataInfo(coordinatorServer, aliveTabletServers);
- }
-}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/CoordinatorMetadataCache.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/CoordinatorMetadataCache.java
new file mode 100644
index 0000000000..093ad1d310
--- /dev/null
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/CoordinatorMetadataCache.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.server.metadata;
+
+import com.alibaba.fluss.cluster.ServerNode;
+import com.alibaba.fluss.cluster.TabletServerInfo;
+import com.alibaba.fluss.server.coordinator.CoordinatorServer;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
+
+/** The implement of {@link ServerMetadataCache} for {@link CoordinatorServer}. */
+public class CoordinatorMetadataCache implements ServerMetadataCache {
+
+ private final Lock metadataLock = new ReentrantLock();
+
+ @GuardedBy("metadataLock")
+ private @Nullable ServerInfo coordinatorServer;
+
+ @GuardedBy("metadataLock")
+ private final Map aliveTabletServers;
+
+ public CoordinatorMetadataCache() {
+ this.coordinatorServer = null;
+ this.aliveTabletServers = new HashMap<>();
+ }
+
+ @Override
+ public boolean isAliveTabletServer(int serverId) {
+ return aliveTabletServers.containsKey(serverId);
+ }
+
+ @Override
+ public Optional getTabletServer(int serverId, String listenerName) {
+ return aliveTabletServers.containsKey(serverId)
+ ? Optional.ofNullable(aliveTabletServers.get(serverId).node(listenerName))
+ : Optional.empty();
+ }
+
+ @Override
+ public Map getAllAliveTabletServers(String listenerName) {
+ Map serverNodes = new HashMap<>();
+ for (Map.Entry entry : aliveTabletServers.entrySet()) {
+ ServerNode serverNode = entry.getValue().node(listenerName);
+ if (serverNode != null) {
+ serverNodes.put(entry.getKey(), serverNode);
+ }
+ }
+ return serverNodes;
+ }
+
+ @Override
+ public @Nullable ServerNode getCoordinatorServer(String listenerName) {
+ return coordinatorServer != null ? coordinatorServer.node(listenerName) : null;
+ }
+
+ @Override
+ public Set getAliveTabletServerInfos() {
+ Set tabletServerInfos = new HashSet<>();
+ aliveTabletServers
+ .values()
+ .forEach(
+ serverInfo ->
+ tabletServerInfos.add(
+ new TabletServerInfo(serverInfo.id(), serverInfo.rack())));
+ return Collections.unmodifiableSet(tabletServerInfos);
+ }
+
+ public void updateMetadata(ServerInfo coordinatorServer, Set serverInfoSet) {
+ inLock(
+ metadataLock,
+ () -> {
+ Map newAliveTableServers = new HashMap<>();
+ for (ServerInfo tabletServer : serverInfoSet) {
+ newAliveTableServers.put(tabletServer.id(), tabletServer);
+ }
+
+ this.coordinatorServer = coordinatorServer;
+ this.aliveTabletServers.clear();
+ this.aliveTabletServers.putAll(newAliveTableServers);
+ });
+ }
+}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/PartitionMetadataInfo.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/PartitionMetadata.java
similarity index 53%
rename from fluss-server/src/main/java/com/alibaba/fluss/server/metadata/PartitionMetadataInfo.java
rename to fluss-server/src/main/java/com/alibaba/fluss/server/metadata/PartitionMetadata.java
index 98ee2eb99c..e3bc014292 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/PartitionMetadataInfo.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/PartitionMetadata.java
@@ -16,27 +16,39 @@
package com.alibaba.fluss.server.metadata;
-import com.alibaba.fluss.cluster.BucketLocation;
-
import java.util.List;
-/** This entity used to describe the table's partition info. */
-public class PartitionMetadataInfo {
+/** This entity used to describe the table's partition metadata. */
+public class PartitionMetadata {
+
+ /**
+ * The already deleted partitionName. This partitionName will be used in UpdateMetadata request
+ * to identify this partition already deleted, but there is an partitionId residual in
+ * zookeeper. In this case, tabletServers need to clear the metadata of this partition.
+ */
+ public static final String DELETED_PARTITION_NAME = "__delete__";
+
+ /**
+ * The already delete partition id. This partition id will be used in UpdateMetadata request to
+ * identify this partition already deleted, and tabletServers need to clear the metadata of this
+ * partition.
+ */
+ public static final Long DELETED_PARTITION_ID = -2L;
private final long tableId;
private final String partitionName;
private final long partitionId;
- private final List bucketLocations;
+ private final List bucketMetadataList;
- public PartitionMetadataInfo(
+ public PartitionMetadata(
long tableId,
String partitionName,
long partitionId,
- List bucketLocations) {
+ List bucketMetadataList) {
this.tableId = tableId;
this.partitionName = partitionName;
this.partitionId = partitionId;
- this.bucketLocations = bucketLocations;
+ this.bucketMetadataList = bucketMetadataList;
}
public long getTableId() {
@@ -51,7 +63,7 @@ public long getPartitionId() {
return partitionId;
}
- public List getBucketLocations() {
- return bucketLocations;
+ public List getBucketMetadataList() {
+ return bucketMetadataList;
}
}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ServerCluster.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ServerCluster.java
deleted file mode 100644
index 4f6543e297..0000000000
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ServerCluster.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Copyright (c) 2025 Alibaba Group Holding Ltd.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.server.metadata;
-
-import com.alibaba.fluss.cluster.ServerNode;
-import com.alibaba.fluss.cluster.TabletServerInfo;
-
-import javax.annotation.Nullable;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-/**
- * An immutable representation of a subset of the server nodes in the fluss cluster. Compared to
- * {@link com.alibaba.fluss.cluster.Cluster}, it includes all the endpoints of the server nodes.
- */
-public class ServerCluster {
- private final @Nullable ServerInfo coordinatorServer;
- private final Map aliveTabletServers;
-
- public ServerCluster(
- @Nullable ServerInfo coordinatorServer, Map aliveTabletServers) {
- this.coordinatorServer = coordinatorServer;
- this.aliveTabletServers = aliveTabletServers;
- }
-
- /** Create an empty cluster instance with no nodes and no table-buckets. */
- public static ServerCluster empty() {
- return new ServerCluster(null, Collections.emptyMap());
- }
-
- public Optional getCoordinatorServer(String listenerName) {
- return coordinatorServer == null
- ? Optional.empty()
- : Optional.ofNullable(coordinatorServer.node(listenerName));
- }
-
- public Optional getAliveTabletServersById(int serverId, String listenerName) {
- return (aliveTabletServers == null || !aliveTabletServers.containsKey(serverId))
- ? Optional.empty()
- : Optional.ofNullable(aliveTabletServers.get(serverId).node(listenerName));
- }
-
- public Map getAliveTabletServers(String listenerName) {
- Map serverNodes = new HashMap<>();
- for (Map.Entry entry : aliveTabletServers.entrySet()) {
- ServerNode serverNode = entry.getValue().node(listenerName);
- if (serverNode != null) {
- serverNodes.put(entry.getKey(), serverNode);
- }
- }
- return serverNodes;
- }
-
- public Set getAliveTabletServerInfos() {
- Set tabletServerInfos = new HashSet<>();
- aliveTabletServers
- .values()
- .forEach(
- serverInfo ->
- tabletServerInfos.add(
- new TabletServerInfo(serverInfo.id(), serverInfo.rack())));
- return Collections.unmodifiableSet(tabletServerInfos);
- }
-}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ServerMetadataCache.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ServerMetadataCache.java
index 831f78896c..4f7e95f58d 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ServerMetadataCache.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ServerMetadataCache.java
@@ -16,23 +16,60 @@
package com.alibaba.fluss.server.metadata;
-import com.alibaba.fluss.cluster.MetadataCache;
-import com.alibaba.fluss.metadata.PhysicalTablePath;
+import com.alibaba.fluss.cluster.ServerNode;
+import com.alibaba.fluss.cluster.TabletServerInfo;
-/** Metadata cache for server. it only caches the cluster metadata. */
-public interface ServerMetadataCache extends MetadataCache {
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/** Metadata cache for server. */
+public interface ServerMetadataCache {
/**
- * Update the cluster metadata by the remote update metadata request.
+ * Get the coordinator server node.
*
- * @param clusterMetadataInfo the metadata info.
+ * @return the coordinator server node
*/
- void updateClusterMetadata(ClusterMetadataInfo clusterMetadataInfo);
+ @Nullable
+ ServerNode getCoordinatorServer(String listenerName);
/**
- * Update the table and database metadata by the remote update table leader and isr request.
- * only leader server cache the table which it stored data. This metedata will used to get the
- * table name and database name when consumer and produce.
+ * Check whether the tablet server id related tablet server node is alive.
+ *
+ * @param serverId the tablet server id
+ * @return true if the server is alive, false otherwise
+ */
+ boolean isAliveTabletServer(int serverId);
+
+ /**
+ * Get the tablet server.
+ *
+ * @param serverId the tablet server id
+ * @return the tablet server node
*/
- void upsertTableBucketMetadata(Long tableId, PhysicalTablePath physicalTablePath);
+ Optional getTabletServer(int serverId, String listenerName);
+
+ /**
+ * Get all alive tablet server nodes.
+ *
+ * @return all alive tablet server nodes
+ */
+ Map getAllAliveTabletServers(String listenerName);
+
+ Set getAliveTabletServerInfos();
+
+ /** Get ids of all alive tablet server nodes. */
+ default TabletServerInfo[] getLiveServers() {
+ Set aliveTabletServerInfos = getAliveTabletServerInfos();
+ TabletServerInfo[] server = new TabletServerInfo[aliveTabletServerInfos.size()];
+ Iterator iterator = aliveTabletServerInfos.iterator();
+ for (int i = 0; i < aliveTabletServerInfos.size(); i++) {
+ server[i] = iterator.next();
+ }
+ return server;
+ }
}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ServerMetadataCacheImpl.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ServerMetadataCacheImpl.java
deleted file mode 100644
index af40c4bba7..0000000000
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ServerMetadataCacheImpl.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright (c) 2025 Alibaba Group Holding Ltd.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.server.metadata;
-
-import com.alibaba.fluss.metadata.PhysicalTablePath;
-
-import java.util.HashMap;
-import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
-
-/** The default implement of {@link ServerMetadataCache}. */
-public class ServerMetadataCacheImpl extends AbstractServerMetadataCache {
- private final Lock bucketMetadataLock = new ReentrantLock();
-
- public ServerMetadataCacheImpl() {
- super();
- }
-
- @Override
- public void updateClusterMetadata(ClusterMetadataInfo clusterMetadataInfo) {
- inLock(
- bucketMetadataLock,
- () -> {
- // 1. update coordinator server.
- ServerInfo coordinatorServer =
- clusterMetadataInfo.getCoordinatorServer().orElse(null);
-
- // 2. Update the alive table servers. We always use the new alive table servers
- // to replace the old alive table servers.
- HashMap newAliveTableServers = new HashMap<>();
- Set aliveTabletServers =
- clusterMetadataInfo.getAliveTabletServers();
- for (ServerInfo tabletServer : aliveTabletServers) {
- newAliveTableServers.put(tabletServer.id(), tabletServer);
- }
-
- clusterMetadata = new ServerCluster(coordinatorServer, newAliveTableServers);
- });
- }
-
- @Override
- public void upsertTableBucketMetadata(Long tableId, PhysicalTablePath physicalTablePath) {
- inLock(
- bucketMetadataLock,
- () -> {
- tableBucketMetadata.put(tableId, physicalTablePath);
- });
- }
-}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ServerMetadataSnapshot.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ServerMetadataSnapshot.java
new file mode 100644
index 0000000000..59a57ba72e
--- /dev/null
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/ServerMetadataSnapshot.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.server.metadata;
+
+import com.alibaba.fluss.cluster.Cluster;
+import com.alibaba.fluss.cluster.ServerNode;
+import com.alibaba.fluss.cluster.TabletServerInfo;
+import com.alibaba.fluss.metadata.PhysicalTablePath;
+import com.alibaba.fluss.metadata.TablePath;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+
+/**
+ * An immutable representation of a full set of the server nodes in the fluss cluster. Every
+ * MetadataSnapshot instance is immutable, and updates (performed under a lock) replace the value
+ * with a completely new one. this means reads (which are not under any lock) need to grab/the value
+ * of this var (into a val) ONCE and retain that read copy for the duration of their operation.
+ * multiple reads of this value risk getting different snapshots.
+ *
+ * Compared to {@link Cluster}, it includes all the endpoints of the server nodes.
+ */
+public class ServerMetadataSnapshot {
+ private final @Nullable ServerInfo coordinatorServer;
+ private final Map aliveTabletServers;
+ private final Map tableIdByPath;
+ private final Map pathByTableId;
+ // partition table.
+ private final Map partitionIdByPath;
+ private final Map partitionNameById;
+
+ // TODO add detail metadata for table and partition, trace
+ // by: https://github.com/alibaba/fluss/issues/900
+
+ public ServerMetadataSnapshot(
+ @Nullable ServerInfo coordinatorServer,
+ Map aliveTabletServers,
+ Map tableIdByPath,
+ Map pathByTableId,
+ Map partitionIdByPath) {
+ this.coordinatorServer = coordinatorServer;
+ this.aliveTabletServers = Collections.unmodifiableMap(aliveTabletServers);
+
+ this.tableIdByPath = Collections.unmodifiableMap(tableIdByPath);
+ this.pathByTableId = Collections.unmodifiableMap(pathByTableId);
+
+ this.partitionIdByPath = Collections.unmodifiableMap(partitionIdByPath);
+ Map tempPartitionNameById = new HashMap<>();
+ partitionIdByPath.forEach(
+ ((physicalTablePath, partitionId) ->
+ tempPartitionNameById.put(
+ partitionId, physicalTablePath.getPartitionName())));
+ this.partitionNameById = Collections.unmodifiableMap(tempPartitionNameById);
+ }
+
+ /** Create an empty cluster instance with no nodes and no table-buckets. */
+ public static ServerMetadataSnapshot empty() {
+ return new ServerMetadataSnapshot(
+ null,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Collections.emptyMap());
+ }
+
+ public ServerNode getCoordinatorServer(String listenerName) {
+ return coordinatorServer == null ? null : coordinatorServer.node(listenerName);
+ }
+
+ public Optional getAliveTabletServersById(int serverId, String listenerName) {
+ return (aliveTabletServers == null || !aliveTabletServers.containsKey(serverId))
+ ? Optional.empty()
+ : Optional.ofNullable(aliveTabletServers.get(serverId).node(listenerName));
+ }
+
+ public Map getAliveTabletServers(String listenerName) {
+ Map serverNodes = new HashMap<>();
+ for (Map.Entry entry : aliveTabletServers.entrySet()) {
+ ServerNode serverNode = entry.getValue().node(listenerName);
+ if (serverNode != null) {
+ serverNodes.put(entry.getKey(), serverNode);
+ }
+ }
+ return serverNodes;
+ }
+
+ public Set getAliveTabletServerInfos() {
+ Set tabletServerInfos = new HashSet<>();
+ aliveTabletServers
+ .values()
+ .forEach(
+ serverInfo ->
+ tabletServerInfos.add(
+ new TabletServerInfo(serverInfo.id(), serverInfo.rack())));
+ return Collections.unmodifiableSet(tabletServerInfos);
+ }
+
+ public OptionalLong getTableId(TablePath tablePath) {
+ Long tableId = tableIdByPath.get(tablePath);
+ return tableId == null ? OptionalLong.empty() : OptionalLong.of(tableId);
+ }
+
+ public Optional getTablePath(long tableId) {
+ return Optional.ofNullable(pathByTableId.get(tableId));
+ }
+
+ public Map getTableIdByPath() {
+ return tableIdByPath;
+ }
+
+ /** Get the partition id for this partition. */
+ public Optional getPartitionId(PhysicalTablePath physicalTablePath) {
+ return Optional.ofNullable(partitionIdByPath.get(physicalTablePath));
+ }
+
+ public Optional getPartitionName(long partitionId) {
+ return Optional.ofNullable(partitionNameById.get(partitionId));
+ }
+
+ public Map getPartitionIdByPath() {
+ return partitionIdByPath;
+ }
+}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TableMetadata.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TableMetadata.java
new file mode 100644
index 0000000000..c3473ee373
--- /dev/null
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TableMetadata.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.server.metadata;
+
+import com.alibaba.fluss.metadata.TableInfo;
+import com.alibaba.fluss.metadata.TablePath;
+
+import java.util.List;
+
+/** This entity used to describe the table metadata. */
+public class TableMetadata {
+
+ /**
+ * The already deleted tablePath. This tablePath will be used in UpdateMetadata request to
+ * identify this tablePath already deleted, but there is an tableId residual in zookeeper. In
+ * this case, tabletServers need to clear the metadata of this tableId.
+ */
+ public static final TablePath DELETED_TABLE_PATH = TablePath.of("__UNKNOWN__", "__delete__");
+
+ /**
+ * The already deleted table id. This table id will be used in UpdateMetadata request to
+ * identify this table already deleted, and tabletServers need to clear the metadata of this
+ * table.
+ */
+ public static final Long DELETED_TABLE_ID = -2L;
+
+ private final TableInfo tableInfo;
+
+ /**
+ * For partition table, this list is always empty. The detail partition metadata is stored in
+ * {@link PartitionMetadata}. By doing this, we can avoid to repeat send tableInfo when
+ * create/drop partitions.
+ *
+ * Note: If we try to update partition metadata, we must make sure we have already updated
+ * the tableInfo for this partition table.
+ */
+ private final List bucketMetadataList;
+
+ public TableMetadata(TableInfo tableInfo, List bucketMetadataList) {
+ this.tableInfo = tableInfo;
+ this.bucketMetadataList = bucketMetadataList;
+ }
+
+ public TableInfo getTableInfo() {
+ return tableInfo;
+ }
+
+ public List getBucketMetadataList() {
+ return bucketMetadataList;
+ }
+}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TableMetadataInfo.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TableMetadataInfo.java
deleted file mode 100644
index 6c3b4ec861..0000000000
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TableMetadataInfo.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (c) 2025 Alibaba Group Holding Ltd.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.server.metadata;
-
-import com.alibaba.fluss.cluster.BucketLocation;
-import com.alibaba.fluss.metadata.TableInfo;
-
-import java.util.List;
-
-/** This entity used to describe the metadata table info. */
-public class TableMetadataInfo {
- private final TableInfo tableInfo;
-
- private final List bucketLocations;
-
- public TableMetadataInfo(TableInfo tableInfo, List bucketLocations) {
- this.tableInfo = tableInfo;
- this.bucketLocations = bucketLocations;
- }
-
- public TableInfo getTableInfo() {
- return tableInfo;
- }
-
- public List getBucketLocations() {
- return bucketLocations;
- }
-}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TabletServerMetadataCache.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TabletServerMetadataCache.java
new file mode 100644
index 0000000000..587c07d123
--- /dev/null
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TabletServerMetadataCache.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.server.metadata;
+
+import com.alibaba.fluss.cluster.ServerNode;
+import com.alibaba.fluss.cluster.TabletServerInfo;
+import com.alibaba.fluss.metadata.PhysicalTablePath;
+import com.alibaba.fluss.metadata.TableInfo;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.server.tablet.TabletServer;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static com.alibaba.fluss.server.metadata.PartitionMetadata.DELETED_PARTITION_ID;
+import static com.alibaba.fluss.server.metadata.PartitionMetadata.DELETED_PARTITION_NAME;
+import static com.alibaba.fluss.server.metadata.TableMetadata.DELETED_TABLE_ID;
+import static com.alibaba.fluss.server.metadata.TableMetadata.DELETED_TABLE_PATH;
+import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
+
+/** The implement of {@link ServerMetadataCache} for {@link TabletServer}. */
+public class TabletServerMetadataCache implements ServerMetadataCache {
+
+ private final Lock metadataLock = new ReentrantLock();
+
+ /**
+ * This is cache state. every Cluster instance is immutable, and updates (performed under a
+ * lock) replace the value with a completely new one. this means reads (which are not under any
+ * lock) need to grab the value of this ONCE and retain that read copy for the duration of their
+ * operation.
+ *
+ * multiple reads of this value risk getting different snapshots.
+ */
+ @GuardedBy("bucketMetadataLock")
+ private volatile ServerMetadataSnapshot serverMetadataSnapshot;
+
+ public TabletServerMetadataCache() {
+ this.serverMetadataSnapshot = ServerMetadataSnapshot.empty();
+ }
+
+ @Override
+ public boolean isAliveTabletServer(int serverId) {
+ Set tabletServerInfoList =
+ serverMetadataSnapshot.getAliveTabletServerInfos();
+ for (TabletServerInfo tabletServer : tabletServerInfoList) {
+ if (tabletServer.getId() == serverId) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Optional getTabletServer(int serverId, String listenerName) {
+ return serverMetadataSnapshot.getAliveTabletServersById(serverId, listenerName);
+ }
+
+ @Override
+ public Map getAllAliveTabletServers(String listenerName) {
+ return serverMetadataSnapshot.getAliveTabletServers(listenerName);
+ }
+
+ @Override
+ public @Nullable ServerNode getCoordinatorServer(String listenerName) {
+ return serverMetadataSnapshot.getCoordinatorServer(listenerName);
+ }
+
+ @Override
+ public Set getAliveTabletServerInfos() {
+ return serverMetadataSnapshot.getAliveTabletServerInfos();
+ }
+
+ public Optional getTablePath(long tableId) {
+ return serverMetadataSnapshot.getTablePath(tableId);
+ }
+
+ public Optional getPartitionName(long partitionId) {
+ return serverMetadataSnapshot.getPartitionName(partitionId);
+ }
+
+ public void updateClusterMetadata(ClusterMetadata clusterMetadata) {
+ inLock(
+ metadataLock,
+ () -> {
+ // 1. update coordinator server.
+ ServerInfo coordinatorServer = clusterMetadata.getCoordinatorServer();
+
+ // 2. Update the alive table servers. We always use the new alive table servers
+ // to replace the old alive table servers.
+ Map newAliveTableServers = new HashMap<>();
+ Set aliveTabletServers = clusterMetadata.getAliveTabletServers();
+ for (ServerInfo tabletServer : aliveTabletServers) {
+ newAliveTableServers.put(tabletServer.id(), tabletServer);
+ }
+
+ // 3. update table metadata. Always partial update.
+ // TODO Currently, it only updates the tableIdByPath, we need to update all the
+ // table metadata. Trace by: https://github.com/alibaba/fluss/issues/900
+ Map tableIdByPath =
+ new HashMap<>(serverMetadataSnapshot.getTableIdByPath());
+ for (TableMetadata tableMetadata : clusterMetadata.getTableMetadataList()) {
+ TableInfo tableInfo = tableMetadata.getTableInfo();
+ TablePath tablePath = tableInfo.getTablePath();
+ long tableId = tableInfo.getTableId();
+ if (tableId == DELETED_TABLE_ID) {
+ tableIdByPath.remove(tablePath);
+ } else if (tablePath == DELETED_TABLE_PATH) {
+ serverMetadataSnapshot
+ .getTablePath(tableId)
+ .ifPresent(tableIdByPath::remove);
+ } else {
+ tableIdByPath.put(tablePath, tableId);
+ }
+ }
+
+ Map newPathByTableId = new HashMap<>();
+ tableIdByPath.forEach(
+ ((tablePath, tableId) -> newPathByTableId.put(tableId, tablePath)));
+
+ // 4. update partition metadata. Always partial update.
+ // TODO Currently, it only updates the partitionIdByPath, we need to update all
+ // the partition metadata. Trace by: https://github.com/alibaba/fluss/issues/900
+ Map partitionIdByPath =
+ new HashMap<>(serverMetadataSnapshot.getPartitionIdByPath());
+ for (PartitionMetadata partitionMetadata :
+ clusterMetadata.getPartitionMetadataList()) {
+ long tableId = partitionMetadata.getTableId();
+ TablePath tablePath = newPathByTableId.get(tableId);
+ String partitionName = partitionMetadata.getPartitionName();
+ PhysicalTablePath physicalTablePath =
+ PhysicalTablePath.of(tablePath, partitionName);
+ long partitionId = partitionMetadata.getPartitionId();
+ if (partitionId == DELETED_PARTITION_ID) {
+ partitionIdByPath.remove(physicalTablePath);
+ } else if (partitionName.equals(DELETED_PARTITION_NAME)) {
+ serverMetadataSnapshot
+ .getPartitionName(partitionId)
+ .ifPresent(
+ pName ->
+ partitionIdByPath.remove(
+ PhysicalTablePath.of(
+ tablePath, pName)));
+ } else {
+ partitionIdByPath.put(physicalTablePath, partitionId);
+ }
+ }
+
+ serverMetadataSnapshot =
+ new ServerMetadataSnapshot(
+ coordinatorServer,
+ newAliveTableServers,
+ tableIdByPath,
+ newPathByTableId,
+ partitionIdByPath);
+ });
+ }
+}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java
index c437ac77a7..1de0ec7d2e 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java
@@ -77,7 +77,8 @@
import com.alibaba.fluss.server.log.LogTablet;
import com.alibaba.fluss.server.log.checkpoint.OffsetCheckpointFile;
import com.alibaba.fluss.server.log.remote.RemoteLogManager;
-import com.alibaba.fluss.server.metadata.ServerMetadataCache;
+import com.alibaba.fluss.server.metadata.ClusterMetadata;
+import com.alibaba.fluss.server.metadata.TabletServerMetadataCache;
import com.alibaba.fluss.server.metrics.group.BucketMetricGroup;
import com.alibaba.fluss.server.metrics.group.PhysicalTableMetricGroup;
import com.alibaba.fluss.server.metrics.group.TabletServerMetricGroup;
@@ -117,7 +118,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -144,7 +144,7 @@ public class ReplicaManager {
@GuardedBy("replicaStateChangeLock")
private final Map allReplicas = MapUtils.newConcurrentHashMap();
- private final ServerMetadataCache metadataCache;
+ private final TabletServerMetadataCache metadataCache;
private final Lock replicaStateChangeLock = new ReentrantLock();
/**
@@ -189,7 +189,7 @@ public ReplicaManager(
KvManager kvManager,
ZooKeeperClient zkClient,
int serverId,
- ServerMetadataCache metadataCache,
+ TabletServerMetadataCache metadataCache,
RpcClient rpcClient,
CoordinatorGateway coordinatorGateway,
CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
@@ -222,7 +222,7 @@ public ReplicaManager(
KvManager kvManager,
ZooKeeperClient zkClient,
int serverId,
- ServerMetadataCache metadataCache,
+ TabletServerMetadataCache metadataCache,
RpcClient rpcClient,
CoordinatorGateway coordinatorGateway,
CompletedKvSnapshotCommitter completedKvSnapshotCommitter,
@@ -328,8 +328,7 @@ private int writerIdCount() {
public void becomeLeaderOrFollower(
int requestCoordinatorEpoch,
List notifyLeaderAndIsrDataList,
- Consumer> responseCallback,
- BiConsumer leaderBucketCallback) {
+ Consumer> responseCallback) {
Map result = new HashMap<>();
inLock(
replicaStateChangeLock,
@@ -345,8 +344,6 @@ public void becomeLeaderOrFollower(
boolean becomeLeader = validateAndGetIsBecomeLeader(data);
if (becomeLeader) {
replicasToBeLeader.add(data);
- leaderBucketCallback.accept(
- tb.getTableId(), data.getPhysicalTablePath());
} else {
replicasToBeFollower.add(data);
}
@@ -371,6 +368,16 @@ public void becomeLeaderOrFollower(
responseCallback.accept(new ArrayList<>(result.values()));
}
+ public void maybeUpdateMetadataCache(int coordinatorEpoch, ClusterMetadata clusterMetadata) {
+ inLock(
+ replicaStateChangeLock,
+ () -> {
+ // check or apply coordinator epoch.
+ validateAndApplyCoordinatorEpoch(coordinatorEpoch, "updateMetadataCache");
+ metadataCache.updateClusterMetadata(clusterMetadata);
+ });
+ }
+
/**
* Append log records to leader replicas of the buckets, and wait for them to be replicated to
* other replicas.
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java
index c737749829..6f6f4e5642 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java
@@ -38,8 +38,7 @@
import com.alibaba.fluss.server.kv.snapshot.DefaultCompletedKvSnapshotCommitter;
import com.alibaba.fluss.server.log.LogManager;
import com.alibaba.fluss.server.log.remote.RemoteLogManager;
-import com.alibaba.fluss.server.metadata.ServerMetadataCache;
-import com.alibaba.fluss.server.metadata.ServerMetadataCacheImpl;
+import com.alibaba.fluss.server.metadata.TabletServerMetadataCache;
import com.alibaba.fluss.server.metrics.ServerMetricUtils;
import com.alibaba.fluss.server.metrics.group.TabletServerMetricGroup;
import com.alibaba.fluss.server.replica.ReplicaManager;
@@ -121,7 +120,7 @@ public class TabletServer extends ServerBase {
private TabletServerMetricGroup tabletServerMetricGroup;
@GuardedBy("lock")
- private ServerMetadataCache metadataCache;
+ private TabletServerMetadataCache metadataCache;
@GuardedBy("lock")
private LogManager logManager;
@@ -178,7 +177,7 @@ protected void startServices() throws Exception {
this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this);
- this.metadataCache = new ServerMetadataCacheImpl();
+ this.metadataCache = new TabletServerMetadataCache();
this.scheduler = new FlussScheduler(conf.get(BACKGROUND_THREADS));
scheduler.startup();
@@ -202,7 +201,7 @@ protected void startServices() throws Exception {
CoordinatorGateway coordinatorGateway =
GatewayClientProxy.createGatewayProxy(
- () -> metadataCache.getCoordinatorServer(interListenerName).get(),
+ () -> metadataCache.getCoordinatorServer(interListenerName),
rpcClient,
CoordinatorGateway.class);
@@ -410,6 +409,11 @@ public int getServerId() {
return serverId;
}
+ @VisibleForTesting
+ public TabletServerMetadataCache getMetadataCache() {
+ return metadataCache;
+ }
+
@VisibleForTesting
public ReplicaManager getReplicaManager() {
return replicaManager;
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java
index 492d05c3fd..8533bddfff 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java
@@ -20,7 +20,6 @@
import com.alibaba.fluss.exception.AuthorizationException;
import com.alibaba.fluss.exception.UnknownTableOrBucketException;
import com.alibaba.fluss.fs.FileSystem;
-import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.record.KvRecordBatch;
@@ -56,6 +55,8 @@
import com.alibaba.fluss.rpc.messages.PutKvResponse;
import com.alibaba.fluss.rpc.messages.StopReplicaRequest;
import com.alibaba.fluss.rpc.messages.StopReplicaResponse;
+import com.alibaba.fluss.rpc.messages.UpdateMetadataRequest;
+import com.alibaba.fluss.rpc.messages.UpdateMetadataResponse;
import com.alibaba.fluss.rpc.protocol.ApiError;
import com.alibaba.fluss.rpc.protocol.Errors;
import com.alibaba.fluss.security.acl.OperationType;
@@ -67,7 +68,7 @@
import com.alibaba.fluss.server.entity.NotifyLeaderAndIsrData;
import com.alibaba.fluss.server.log.FetchParams;
import com.alibaba.fluss.server.log.ListOffsetsParam;
-import com.alibaba.fluss.server.metadata.ServerMetadataCache;
+import com.alibaba.fluss.server.metadata.TabletServerMetadataCache;
import com.alibaba.fluss.server.replica.ReplicaManager;
import com.alibaba.fluss.server.utils.ServerRpcMessageUtils;
import com.alibaba.fluss.server.zk.ZooKeeperClient;
@@ -78,6 +79,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
@@ -85,6 +87,7 @@
import static com.alibaba.fluss.security.acl.OperationType.READ;
import static com.alibaba.fluss.security.acl.OperationType.WRITE;
+import static com.alibaba.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH;
import static com.alibaba.fluss.server.log.FetchParams.DEFAULT_MAX_WAIT_MS_WHEN_MIN_BYTES_ENABLE;
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getFetchLogData;
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getListOffsetsData;
@@ -96,6 +99,7 @@
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getPutKvData;
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getStopReplicaData;
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getTargetColumns;
+import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getUpdateMetadataRequestData;
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeFetchLogResponse;
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeInitWriterResponse;
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeLimitScanResponse;
@@ -114,13 +118,14 @@ public final class TabletService extends RpcServiceBase implements TabletServerG
private final String serviceName;
private final ReplicaManager replicaManager;
+ private final TabletServerMetadataCache metadataCache;
public TabletService(
int serverId,
FileSystem remoteFileSystem,
ZooKeeperClient zkClient,
ReplicaManager replicaManager,
- ServerMetadataCache metadataCache,
+ TabletServerMetadataCache metadataCache,
MetadataManager metadataManager,
@Nullable Authorizer authorizer) {
super(
@@ -132,6 +137,7 @@ public TabletService(
authorizer);
this.serviceName = "server-" + serverId;
this.replicaManager = replicaManager;
+ this.metadataCache = metadataCache;
}
@Override
@@ -151,9 +157,7 @@ public CompletableFuture produceLog(ProduceLogRequest reques
request.getTimeoutMs(),
request.getAcks(),
produceLogData,
- bucketResponseMap -> {
- response.complete(makeProduceLogResponse(bucketResponseMap));
- });
+ bucketResponseMap -> response.complete(makeProduceLogResponse(bucketResponseMap)));
return response;
}
@@ -211,9 +215,7 @@ public CompletableFuture putKv(PutKvRequest request) {
request.getAcks(),
putKvData,
getTargetColumns(request),
- bucketResponse -> {
- response.complete(makePutKvResponse(bucketResponse));
- });
+ bucketResponse -> response.complete(makePutKvResponse(bucketResponse)));
return response;
}
@@ -231,9 +233,7 @@ public CompletableFuture lookup(LookupRequest request) {
CompletableFuture response = new CompletableFuture<>();
replicaManager.lookups(
lookupData,
- value -> {
- response.complete(makeLookupResponse(value, errorResponseMap));
- });
+ value -> response.complete(makeLookupResponse(value, errorResponseMap)));
return response;
}
@@ -251,9 +251,7 @@ public CompletableFuture prefixLookup(PrefixLookupRequest
CompletableFuture response = new CompletableFuture<>();
replicaManager.prefixLookups(
prefixLookupData,
- value -> {
- response.complete(makePrefixLookupResponse(value, errorResponseMap));
- });
+ value -> response.complete(makePrefixLookupResponse(value, errorResponseMap)));
return response;
}
@@ -281,11 +279,21 @@ public CompletableFuture notifyLeaderAndIsr(
replicaManager.becomeLeaderOrFollower(
notifyLeaderAndIsrRequest.getCoordinatorEpoch(),
notifyLeaderAndIsrRequestData,
- result -> response.complete(makeNotifyLeaderAndIsrResponse(result)),
- metadataCache::upsertTableBucketMetadata);
+ result -> response.complete(makeNotifyLeaderAndIsrResponse(result)));
return response;
}
+ @Override
+ public CompletableFuture updateMetadata(UpdateMetadataRequest request) {
+ int coordinatorEpoch =
+ request.hasCoordinatorEpoch()
+ ? request.getCoordinatorEpoch()
+ : INITIAL_COORDINATOR_EPOCH;
+ replicaManager.maybeUpdateMetadataCache(
+ coordinatorEpoch, getUpdateMetadataRequestData(request));
+ return CompletableFuture.completedFuture(new UpdateMetadataResponse());
+ }
+
@Override
public CompletableFuture stopReplica(
StopReplicaRequest stopReplicaRequest) {
@@ -351,18 +359,17 @@ public CompletableFuture notifyLakeTableOffset(
}
private void authorizeTable(OperationType operationType, long tableId) {
- PhysicalTablePath tablePath = metadataCache.getTablePath(tableId);
+ TablePath tablePath = metadataCache.getTablePath(tableId).orElse(null);
if (tablePath == null) {
throw new UnknownTableOrBucketException(
String.format(
- "This server %s does not host this table ID %s. This may happen when the table metadata cache in the server is not updated yet.",
+ "This server %s does not know this table ID %s. This may happen when the table "
+ + "metadata cache in the server is not updated yet.",
serviceName, tableId));
}
if (authorizer != null
&& !authorizer.isAuthorized(
- currentSession(),
- operationType,
- Resource.table(tablePath.getTablePath()))) {
+ currentSession(), operationType, Resource.table(tablePath))) {
throw new AuthorizationException(
String.format(
"No permission to %s table %s in database %s",
@@ -410,8 +417,8 @@ private Map authorizeRequestData(
requestData.forEach(
(tableBucket, bucketData) -> {
long tableId = tableBucket.getTableId();
- PhysicalTablePath tablePath = metadataCache.getTablePath(tableId);
- if (tablePath == null) {
+ Optional tablePathOpt = metadataCache.getTablePath(tableId);
+ if (!tablePathOpt.isPresent()) {
errorResponseMap.put(
tableBucket,
resultCreator.apply(
@@ -419,10 +426,11 @@ private Map authorizeRequestData(
new ApiError(
Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION,
String.format(
- "This server %s does not host this table ID %s. "
+ "This server %s does not know this table ID %s. "
+ "This may happen when the table metadata cache in the server is not updated yet.",
serviceName, tableId))));
} else if (!filteredTableIds.contains(tableId)) {
+ TablePath tablePath = tablePathOpt.get();
errorResponseMap.put(
tableBucket,
resultCreator.apply(
@@ -448,13 +456,13 @@ private Set filterAuthorizedTables(
.distinct()
.filter(
tableId -> {
- PhysicalTablePath tablePath = metadataCache.getTablePath(tableId);
- return tablePath != null
+ Optional tablePathOpt = metadataCache.getTablePath(tableId);
+ return tablePathOpt.isPresent()
&& authorizer != null
&& authorizer.isAuthorized(
currentSession(),
operationType,
- Resource.table(tablePath.getTablePath()));
+ Resource.table(tablePathOpt.get()));
})
.collect(Collectors.toSet());
}
diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
index 5f8fc7a7cf..f0eb531ea4 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
@@ -19,12 +19,15 @@
import com.alibaba.fluss.cluster.Endpoint;
import com.alibaba.fluss.cluster.ServerNode;
import com.alibaba.fluss.cluster.ServerType;
+import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.fs.FsPath;
import com.alibaba.fluss.fs.token.ObtainedSecurityToken;
import com.alibaba.fluss.metadata.PartitionSpec;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.record.BytesViewLogRecords;
import com.alibaba.fluss.record.DefaultKvRecordBatch;
@@ -65,6 +68,7 @@
import com.alibaba.fluss.rpc.messages.ListPartitionInfosResponse;
import com.alibaba.fluss.rpc.messages.LookupRequest;
import com.alibaba.fluss.rpc.messages.LookupResponse;
+import com.alibaba.fluss.rpc.messages.MetadataResponse;
import com.alibaba.fluss.rpc.messages.NotifyKvSnapshotOffsetRequest;
import com.alibaba.fluss.rpc.messages.NotifyLakeTableOffsetRequest;
import com.alibaba.fluss.rpc.messages.NotifyLeaderAndIsrRequest;
@@ -75,6 +79,7 @@
import com.alibaba.fluss.rpc.messages.PbAdjustIsrReqForTable;
import com.alibaba.fluss.rpc.messages.PbAdjustIsrRespForBucket;
import com.alibaba.fluss.rpc.messages.PbAdjustIsrRespForTable;
+import com.alibaba.fluss.rpc.messages.PbBucketMetadata;
import com.alibaba.fluss.rpc.messages.PbCreateAclRespInfo;
import com.alibaba.fluss.rpc.messages.PbDropAclsFilterResult;
import com.alibaba.fluss.rpc.messages.PbDropAclsMatchingAcl;
@@ -93,6 +98,7 @@
import com.alibaba.fluss.rpc.messages.PbNotifyLakeTableOffsetReqForBucket;
import com.alibaba.fluss.rpc.messages.PbNotifyLeaderAndIsrReqForBucket;
import com.alibaba.fluss.rpc.messages.PbNotifyLeaderAndIsrRespForBucket;
+import com.alibaba.fluss.rpc.messages.PbPartitionMetadata;
import com.alibaba.fluss.rpc.messages.PbPartitionSpec;
import com.alibaba.fluss.rpc.messages.PbPhysicalTablePath;
import com.alibaba.fluss.rpc.messages.PbPrefixLookupReqForBucket;
@@ -107,6 +113,7 @@
import com.alibaba.fluss.rpc.messages.PbStopReplicaReqForBucket;
import com.alibaba.fluss.rpc.messages.PbStopReplicaRespForBucket;
import com.alibaba.fluss.rpc.messages.PbTableBucket;
+import com.alibaba.fluss.rpc.messages.PbTableMetadata;
import com.alibaba.fluss.rpc.messages.PbTablePath;
import com.alibaba.fluss.rpc.messages.PbValue;
import com.alibaba.fluss.rpc.messages.PbValueList;
@@ -139,7 +146,11 @@
import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshot;
import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshotJsonSerde;
import com.alibaba.fluss.server.kv.snapshot.KvSnapshotHandle;
+import com.alibaba.fluss.server.metadata.BucketMetadata;
+import com.alibaba.fluss.server.metadata.ClusterMetadata;
+import com.alibaba.fluss.server.metadata.PartitionMetadata;
import com.alibaba.fluss.server.metadata.ServerInfo;
+import com.alibaba.fluss.server.metadata.TableMetadata;
import com.alibaba.fluss.server.zk.data.BucketSnapshot;
import com.alibaba.fluss.server.zk.data.LakeTableSnapshot;
import com.alibaba.fluss.server.zk.data.LeaderAndIsr;
@@ -148,12 +159,15 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -212,9 +226,54 @@ public static ServerNode toServerNode(PbServerNode pbServerNode, ServerType serv
pbServerNode.hasRack() ? pbServerNode.getRack() : null);
}
- @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+ public static MetadataResponse buildMetadataResponse(
+ @Nullable ServerNode coordinatorServer,
+ Set aliveTabletServers,
+ List tableMetadataList,
+ List partitionMetadataList) {
+ MetadataResponse metadataResponse = new MetadataResponse();
+
+ if (coordinatorServer != null) {
+ metadataResponse
+ .setCoordinatorServer()
+ .setNodeId(coordinatorServer.id())
+ .setHost(coordinatorServer.host())
+ .setPort(coordinatorServer.port());
+ }
+
+ List pbServerNodeList = new ArrayList<>();
+ for (ServerNode serverNode : aliveTabletServers) {
+ PbServerNode pbServerNode =
+ new PbServerNode()
+ .setNodeId(serverNode.id())
+ .setHost(serverNode.host())
+ .setPort(serverNode.port());
+ if (serverNode.rack() != null) {
+ pbServerNode.setRack(serverNode.rack());
+ }
+ pbServerNodeList.add(pbServerNode);
+ }
+
+ List pbTableMetadataList = new ArrayList<>();
+ tableMetadataList.forEach(
+ tableMetadata -> pbTableMetadataList.add(toPbTableMetadata(tableMetadata)));
+
+ List pbPartitionMetadataList = new ArrayList<>();
+ partitionMetadataList.forEach(
+ partitionMetadata ->
+ pbPartitionMetadataList.add(toPbPartitionMetadata(partitionMetadata)));
+
+ metadataResponse.addAllTabletServers(pbServerNodeList);
+ metadataResponse.addAllTableMetadatas(pbTableMetadataList);
+ metadataResponse.addAllPartitionMetadatas(pbPartitionMetadataList);
+ return metadataResponse;
+ }
+
public static UpdateMetadataRequest makeUpdateMetadataRequest(
- Optional coordinatorServer, Set aliveTableServers) {
+ @Nullable ServerInfo coordinatorServer,
+ Set aliveTableServers,
+ List tableMetadataList,
+ List partitionMetadataList) {
UpdateMetadataRequest updateMetadataRequest = new UpdateMetadataRequest();
Set aliveTableServerNodes = new HashSet<>();
for (ServerInfo serverInfo : aliveTableServers) {
@@ -232,18 +291,183 @@ public static UpdateMetadataRequest makeUpdateMetadataRequest(
aliveTableServerNodes.add(pbTabletServerNode);
}
updateMetadataRequest.addAllTabletServers(aliveTableServerNodes);
- coordinatorServer.map(
- node ->
- updateMetadataRequest
- .setCoordinatorServer()
- .setNodeId(node.id())
- .setListeners(Endpoint.toListenersString(node.endpoints()))
- // for backward compatibility for versions <= 0.6
- .setHost(node.endpoints().get(0).getHost())
- .setPort(node.endpoints().get(0).getPort()));
+
+ if (coordinatorServer != null) {
+ updateMetadataRequest
+ .setCoordinatorServer()
+ .setNodeId(coordinatorServer.id())
+ .setListeners(Endpoint.toListenersString(coordinatorServer.endpoints()))
+ // for backward compatibility for versions <= 0.6
+ .setHost(coordinatorServer.endpoints().get(0).getHost())
+ .setPort(coordinatorServer.endpoints().get(0).getPort());
+ }
+
+ List pbTableMetadataList = new ArrayList<>();
+ tableMetadataList.forEach(
+ tableMetadata -> pbTableMetadataList.add(toPbTableMetadata(tableMetadata)));
+
+ List pbPartitionMetadataList = new ArrayList<>();
+ partitionMetadataList.forEach(
+ partitionMetadata ->
+ pbPartitionMetadataList.add(toPbPartitionMetadata(partitionMetadata)));
+ updateMetadataRequest.addAllTableMetadatas(pbTableMetadataList);
+ updateMetadataRequest.addAllPartitionMetadatas(pbPartitionMetadataList);
+
return updateMetadataRequest;
}
+ public static ClusterMetadata getUpdateMetadataRequestData(UpdateMetadataRequest request) {
+ ServerInfo coordinatorServer = null;
+ if (request.hasCoordinatorServer()) {
+ PbServerNode pbCoordinatorServer = request.getCoordinatorServer();
+ List endpoints =
+ pbCoordinatorServer.hasListeners()
+ ? Endpoint.fromListenersString(pbCoordinatorServer.getListeners())
+ // backward compatible with old version that doesn't have listeners
+ : Collections.singletonList(
+ new Endpoint(
+ pbCoordinatorServer.getHost(),
+ pbCoordinatorServer.getPort(),
+ // TODO: maybe use internal listener name from conf
+ ConfigOptions.INTERNAL_LISTENER_NAME.defaultValue()));
+ coordinatorServer =
+ new ServerInfo(
+ pbCoordinatorServer.getNodeId(),
+ pbCoordinatorServer.hasRack() ? pbCoordinatorServer.getRack() : null,
+ endpoints,
+ ServerType.COORDINATOR);
+ }
+
+ Set aliveTabletServers = new HashSet<>();
+ for (PbServerNode tabletServer : request.getTabletServersList()) {
+ List endpoints =
+ tabletServer.hasListeners()
+ ? Endpoint.fromListenersString(tabletServer.getListeners())
+ // backward compatible with old version that doesn't have listeners
+ : Collections.singletonList(
+ new Endpoint(
+ tabletServer.getHost(),
+ tabletServer.getPort(),
+ // TODO: maybe use internal listener name from conf
+ ConfigOptions.INTERNAL_LISTENER_NAME.defaultValue()));
+ aliveTabletServers.add(
+ new ServerInfo(
+ tabletServer.getNodeId(),
+ tabletServer.hasRack() ? tabletServer.getRack() : null,
+ endpoints,
+ ServerType.TABLET_SERVER));
+ }
+
+ List tableMetadataList = new ArrayList<>();
+ request.getTableMetadatasList()
+ .forEach(tableMetadata -> tableMetadataList.add(toTableMetaData(tableMetadata)));
+
+ List partitionMetadataList = new ArrayList<>();
+ request.getPartitionMetadatasList()
+ .forEach(
+ partitionMetadata ->
+ partitionMetadataList.add(toPartitionMetadata(partitionMetadata)));
+
+ return new ClusterMetadata(
+ coordinatorServer, aliveTabletServers, tableMetadataList, partitionMetadataList);
+ }
+
+ private static PbTableMetadata toPbTableMetadata(TableMetadata tableMetadata) {
+ TableInfo tableInfo = tableMetadata.getTableInfo();
+ PbTableMetadata pbTableMetadata =
+ new PbTableMetadata()
+ .setTableId(tableInfo.getTableId())
+ .setSchemaId(tableInfo.getSchemaId())
+ .setTableJson(tableInfo.toTableDescriptor().toJsonBytes())
+ .setCreatedTime(tableInfo.getCreatedTime())
+ .setModifiedTime(tableInfo.getModifiedTime());
+ TablePath tablePath = tableInfo.getTablePath();
+ pbTableMetadata
+ .setTablePath()
+ .setDatabaseName(tablePath.getDatabaseName())
+ .setTableName(tablePath.getTableName());
+ pbTableMetadata.addAllBucketMetadatas(
+ toPbBucketMetadata(tableMetadata.getBucketMetadataList()));
+ return pbTableMetadata;
+ }
+
+ private static PbPartitionMetadata toPbPartitionMetadata(PartitionMetadata partitionMetadata) {
+ PbPartitionMetadata pbPartitionMetadata =
+ new PbPartitionMetadata()
+ .setTableId(partitionMetadata.getTableId())
+ .setPartitionId(partitionMetadata.getPartitionId())
+ .setPartitionName(partitionMetadata.getPartitionName());
+ pbPartitionMetadata.addAllBucketMetadatas(
+ toPbBucketMetadata(partitionMetadata.getBucketMetadataList()));
+ return pbPartitionMetadata;
+ }
+
+ private static List toPbBucketMetadata(
+ List bucketMetadataList) {
+ List pbBucketMetadataList = new ArrayList<>();
+ for (BucketMetadata bucketMetadata : bucketMetadataList) {
+ PbBucketMetadata pbBucketMetadata =
+ new PbBucketMetadata().setBucketId(bucketMetadata.getBucketId());
+
+ OptionalInt leaderEpochOpt = bucketMetadata.getLeaderEpoch();
+ if (leaderEpochOpt.isPresent()) {
+ pbBucketMetadata.setLeaderEpoch(leaderEpochOpt.getAsInt());
+ }
+
+ OptionalInt leaderId = bucketMetadata.getLeaderId();
+ if (leaderId.isPresent()) {
+ pbBucketMetadata.setLeaderId(leaderId.getAsInt());
+ }
+
+ for (Integer replica : bucketMetadata.getReplicas()) {
+ pbBucketMetadata.addReplicaId(replica);
+ }
+
+ pbBucketMetadataList.add(pbBucketMetadata);
+ }
+ return pbBucketMetadataList;
+ }
+
+ private static TableMetadata toTableMetaData(PbTableMetadata pbTableMetadata) {
+ TablePath tablePath = toTablePath(pbTableMetadata.getTablePath());
+ long tableId = pbTableMetadata.getTableId();
+ TableInfo tableInfo =
+ TableInfo.of(
+ tablePath,
+ tableId,
+ pbTableMetadata.getSchemaId(),
+ TableDescriptor.fromJsonBytes(pbTableMetadata.getTableJson()),
+ pbTableMetadata.getCreatedTime(),
+ pbTableMetadata.getModifiedTime());
+
+ List bucketMetadata = new ArrayList<>();
+ for (PbBucketMetadata pbBucketMetadata : pbTableMetadata.getBucketMetadatasList()) {
+ bucketMetadata.add(toBucketMetadata(pbBucketMetadata));
+ }
+
+ return new TableMetadata(tableInfo, bucketMetadata);
+ }
+
+ private static BucketMetadata toBucketMetadata(PbBucketMetadata pbBucketMetadata) {
+ return new BucketMetadata(
+ pbBucketMetadata.getBucketId(),
+ pbBucketMetadata.hasLeaderId() ? pbBucketMetadata.getLeaderId() : null,
+ pbBucketMetadata.hasLeaderEpoch() ? pbBucketMetadata.getLeaderEpoch() : null,
+ Arrays.stream(pbBucketMetadata.getReplicaIds())
+ .boxed()
+ .collect(Collectors.toList()));
+ }
+
+ private static PartitionMetadata toPartitionMetadata(PbPartitionMetadata pbPartitionMetadata) {
+ return new PartitionMetadata(
+ pbPartitionMetadata.getTableId(),
+ pbPartitionMetadata.getPartitionName(),
+ pbPartitionMetadata.getPartitionId(),
+ pbPartitionMetadata.getBucketMetadatasList().stream()
+ .map(ServerRpcMessageUtils::toBucketMetadata)
+ .collect(Collectors.toList()));
+ }
+
public static NotifyLeaderAndIsrRequest makeNotifyLeaderAndIsrRequest(
int coordinatorEpoch, Collection notifyLeaders) {
return new NotifyLeaderAndIsrRequest()
diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java
index fd38933871..530e8f0fa4 100644
--- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java
+++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/AutoPartitionManagerTest.java
@@ -24,7 +24,7 @@
import com.alibaba.fluss.metadata.TableDescriptor;
import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
-import com.alibaba.fluss.server.testutils.TestingMetadataCache;
+import com.alibaba.fluss.server.testutils.TestingServerMetadataCache;
import com.alibaba.fluss.server.zk.NOPErrorHandler;
import com.alibaba.fluss.server.zk.ZooKeeperClient;
import com.alibaba.fluss.server.zk.ZooKeeperExtension;
@@ -188,7 +188,7 @@ void testAddPartitionedTable(TestParams params) throws Exception {
AutoPartitionManager autoPartitionManager =
new AutoPartitionManager(
- new TestingMetadataCache(3),
+ new TestingServerMetadataCache(3),
new MetadataManager(zookeeperClient, new Configuration()),
new Configuration(),
clock,
@@ -270,7 +270,7 @@ void testMaxPartitions() throws Exception {
AutoPartitionManager autoPartitionManager =
new AutoPartitionManager(
- new TestingMetadataCache(3),
+ new TestingServerMetadataCache(3),
metadataManager,
new Configuration(),
clock,
@@ -346,7 +346,7 @@ void testAutoCreateDayPartitionShouldJitter() throws Exception {
new ManuallyTriggeredScheduledExecutorService();
AutoPartitionManager autoPartitionManager =
new AutoPartitionManager(
- new TestingMetadataCache(3),
+ new TestingServerMetadataCache(3),
metadataManager,
new Configuration(),
clock,
@@ -406,7 +406,7 @@ void testMaxBucketNum() throws Exception {
AutoPartitionManager autoPartitionManager =
new AutoPartitionManager(
- new TestingMetadataCache(3),
+ new TestingServerMetadataCache(3),
metadataManager,
config,
clock,
diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorChannelManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorChannelManagerTest.java
index 8614adad63..c4bbf81d49 100644
--- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorChannelManagerTest.java
+++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorChannelManagerTest.java
@@ -31,7 +31,6 @@
import java.time.Duration;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeUpdateMetadataRequest;
@@ -91,7 +90,11 @@ private void checkSendRequest(
AtomicInteger sendFlag = new AtomicInteger(0);
// we use update metadata request to test for simplicity
UpdateMetadataRequest updateMetadataRequest =
- makeUpdateMetadataRequest(Optional.empty(), Collections.emptySet());
+ makeUpdateMetadataRequest(
+ null,
+ Collections.emptySet(),
+ Collections.emptyList(),
+ Collections.emptyList());
coordinatorChannelManager.sendRequest(
targetServerId,
updateMetadataRequest,
diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java
index d0a64230f3..78400bbe8f 100644
--- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java
+++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java
@@ -44,15 +44,15 @@
import com.alibaba.fluss.server.entity.CommitRemoteLogManifestData;
import com.alibaba.fluss.server.kv.snapshot.CompletedSnapshot;
import com.alibaba.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore;
+import com.alibaba.fluss.server.metadata.CoordinatorMetadataCache;
import com.alibaba.fluss.server.metadata.ServerInfo;
-import com.alibaba.fluss.server.metadata.ServerMetadataCache;
-import com.alibaba.fluss.server.metadata.ServerMetadataCacheImpl;
import com.alibaba.fluss.server.metrics.group.TestingMetricGroups;
import com.alibaba.fluss.server.tablet.TestTabletServerGateway;
import com.alibaba.fluss.server.zk.NOPErrorHandler;
import com.alibaba.fluss.server.zk.ZooKeeperClient;
import com.alibaba.fluss.server.zk.ZooKeeperExtension;
import com.alibaba.fluss.server.zk.data.BucketAssignment;
+import com.alibaba.fluss.server.zk.data.CoordinatorAddress;
import com.alibaba.fluss.server.zk.data.LeaderAndIsr;
import com.alibaba.fluss.server.zk.data.PartitionAssignment;
import com.alibaba.fluss.server.zk.data.TableAssignment;
@@ -130,11 +130,11 @@ class CoordinatorEventProcessorTest {
private CoordinatorEventProcessor eventProcessor;
private final String defaultDatabase = "db";
- private ServerMetadataCache serverMetadataCache;
private TestCoordinatorChannelManager testCoordinatorChannelManager;
private AutoPartitionManager autoPartitionManager;
private LakeTableTieringManager lakeTableTieringManager;
private CompletedSnapshotStoreManager completedSnapshotStoreManager;
+ private CoordinatorMetadataCache serverMetadataCache;
@BeforeAll
static void baseBeforeAll() throws Exception {
@@ -143,6 +143,12 @@ static void baseBeforeAll() throws Exception {
.getCustomExtension()
.getZooKeeperClient(NOPErrorHandler.INSTANCE);
metadataManager = new MetadataManager(zookeeperClient, new Configuration());
+
+ // register coordinator server
+ zookeeperClient.registerCoordinatorLeader(
+ new CoordinatorAddress(
+ "2", Endpoint.fromListenersString("CLIENT://localhost:10012")));
+
// register 3 tablet servers
for (int i = 0; i < 3; i++) {
zookeeperClient.registerTabletServer(
@@ -157,7 +163,7 @@ static void baseBeforeAll() throws Exception {
@BeforeEach
void beforeEach() throws IOException {
- serverMetadataCache = new ServerMetadataCacheImpl();
+ serverMetadataCache = new CoordinatorMetadataCache();
// set a test channel manager for the context
testCoordinatorChannelManager = new TestCoordinatorChannelManager();
autoPartitionManager =
@@ -759,6 +765,7 @@ private CoordinatorEventProcessor buildCoordinatorEventProcessor() {
zookeeperClient,
serverMetadataCache,
testCoordinatorChannelManager,
+ new CoordinatorContext(),
autoPartitionManager,
lakeTableTieringManager,
TestingMetricGroups.COORDINATOR_METRICS,
diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java
index 8a6eeeca01..f3e8155cde 100644
--- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java
+++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerITCase.java
@@ -41,6 +41,7 @@
import com.alibaba.fluss.rpc.gateway.AdminGateway;
import com.alibaba.fluss.rpc.gateway.AdminReadOnlyGateway;
import com.alibaba.fluss.rpc.gateway.CoordinatorGateway;
+import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
import com.alibaba.fluss.rpc.messages.GetTableInfoResponse;
import com.alibaba.fluss.rpc.messages.GetTableSchemaRequest;
import com.alibaba.fluss.rpc.messages.ListDatabasesRequest;
@@ -486,11 +487,16 @@ void testMetadata(boolean isCoordinatorServer) throws Exception {
// now, assuming we send update metadata request to the server,
// we should get the same response
- gateway.updateMetadata(
- makeUpdateMetadataRequest(
- Optional.of(coordinatorServerInfo),
- new HashSet<>(tabletServerInfos)))
- .get();
+ if (!isCoordinatorServer) {
+ ((TabletServerGateway) gateway)
+ .updateMetadata(
+ makeUpdateMetadataRequest(
+ coordinatorServerInfo,
+ new HashSet<>(tabletServerInfos),
+ Collections.emptyList(),
+ Collections.emptyList()))
+ .get();
+ }
// test lookup metadata from internal view
@@ -619,11 +625,14 @@ void testMetadataCompatibility(boolean isCoordinatorServer) throws Exception {
// now, assuming we send update metadata request to the server,
// we should get the same response
- gateway.updateMetadata(
- makeLegacyUpdateMetadataRequest(
- Optional.of(coordinatorServerInfo),
- new HashSet<>(tabletServerInfos)))
- .get();
+ if (!isCoordinatorServer) {
+ ((TabletServerGateway) gateway)
+ .updateMetadata(
+ makeLegacyUpdateMetadataRequest(
+ Optional.of(coordinatorServerInfo),
+ new HashSet<>(tabletServerInfos)))
+ .get();
+ }
// test lookup metadata
AdminGateway adminGatewayForClient = getAdminGateway();
diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerTest.java
index b9886fa75e..1f0283a684 100644
--- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerTest.java
+++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TableManagerTest.java
@@ -20,6 +20,7 @@
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableBucketReplica;
+import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.server.coordinator.event.CoordinatorEvent;
import com.alibaba.fluss.server.coordinator.event.DeleteReplicaResponseReceivedEvent;
import com.alibaba.fluss.server.coordinator.event.TestingEventManager;
@@ -54,6 +55,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import static com.alibaba.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
+import static com.alibaba.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR_PK;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK;
@@ -110,7 +113,8 @@ private void initTableManager() {
Configuration conf = new Configuration();
conf.setString(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data");
CoordinatorRequestBatch coordinatorRequestBatch =
- new CoordinatorRequestBatch(testCoordinatorChannelManager, testingEventManager);
+ new CoordinatorRequestBatch(
+ testCoordinatorChannelManager, testingEventManager, coordinatorContext);
ReplicaStateMachine replicaStateMachine =
new ReplicaStateMachine(
coordinatorContext, coordinatorRequestBatch, zookeeperClient);
@@ -143,6 +147,14 @@ void testCreateTable() throws Exception {
.build();
long tableId = DATA1_TABLE_ID;
+ coordinatorContext.putTableInfo(
+ TableInfo.of(
+ DATA1_TABLE_PATH,
+ tableId,
+ 0,
+ DATA1_TABLE_DESCRIPTOR,
+ System.currentTimeMillis(),
+ System.currentTimeMillis()));
tableManager.onCreateNewTable(DATA1_TABLE_PATH, tableId, assignment);
// all replica should be online
@@ -158,6 +170,14 @@ void testDeleteTable() throws Exception {
TableAssignment assignment = createAssignment();
zookeeperClient.registerTableAssignment(tableId, assignment);
+ coordinatorContext.putTableInfo(
+ TableInfo.of(
+ DATA1_TABLE_PATH_PK,
+ tableId,
+ 0,
+ DATA1_TABLE_DESCRIPTOR_PK,
+ System.currentTimeMillis(),
+ System.currentTimeMillis()));
tableManager.onCreateNewTable(DATA1_TABLE_PATH_PK, tableId, assignment);
// now, delete the created table
@@ -186,6 +206,14 @@ void testResumeDeletionAfterRestart() throws Exception {
TableAssignment assignment = createAssignment();
zookeeperClient.registerTableAssignment(tableId, assignment);
+ coordinatorContext.putTableInfo(
+ TableInfo.of(
+ DATA1_TABLE_PATH,
+ tableId,
+ 0,
+ DATA1_TABLE_DESCRIPTOR,
+ System.currentTimeMillis(),
+ System.currentTimeMillis()));
tableManager.onCreateNewTable(DATA1_TABLE_PATH, tableId, assignment);
// now, delete the created table/partition
@@ -225,6 +253,14 @@ void testCreateAndDropPartition() throws Exception {
TableAssignment assignment = TableAssignment.builder().build();
zookeeperClient.registerTableAssignment(tableId, assignment);
+ coordinatorContext.putTableInfo(
+ TableInfo.of(
+ DATA1_TABLE_PATH,
+ tableId,
+ 0,
+ DATA1_TABLE_DESCRIPTOR,
+ System.currentTimeMillis(),
+ System.currentTimeMillis()));
tableManager.onCreateNewTable(DATA1_TABLE_PATH, tableId, assignment);
PartitionAssignment partitionAssignment =
diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java
index cabd2b04e2..f79f4bf932 100644
--- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java
+++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java
@@ -75,8 +75,6 @@
import com.alibaba.fluss.rpc.messages.MetadataResponse;
import com.alibaba.fluss.rpc.messages.TableExistsRequest;
import com.alibaba.fluss.rpc.messages.TableExistsResponse;
-import com.alibaba.fluss.rpc.messages.UpdateMetadataRequest;
-import com.alibaba.fluss.rpc.messages.UpdateMetadataResponse;
import com.alibaba.fluss.rpc.protocol.ApiError;
import com.alibaba.fluss.server.entity.AdjustIsrResultForBucket;
import com.alibaba.fluss.server.entity.CommitRemoteLogManifestData;
@@ -196,11 +194,6 @@ public CompletableFuture metadata(MetadataRequest request) {
throw new UnsupportedOperationException();
}
- @Override
- public CompletableFuture updateMetadata(UpdateMetadataRequest request) {
- throw new UnsupportedOperationException();
- }
-
@Override
public CompletableFuture getLatestKvSnapshots(
GetLatestKvSnapshotsRequest request) {
diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
index 3e08d68148..0c8cd24fea 100644
--- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
+++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
@@ -21,6 +21,7 @@
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableBucketReplica;
+import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.rpc.RpcClient;
import com.alibaba.fluss.rpc.metrics.TestingClientMetricGroup;
import com.alibaba.fluss.server.coordinator.CoordinatorChannelManager;
@@ -50,6 +51,8 @@
import java.util.List;
import java.util.Map;
+import static com.alibaba.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
+import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH;
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.NewReplica;
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.OfflineReplica;
import static com.alibaba.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica;
@@ -192,6 +195,15 @@ void testOfflineReplicaShouldBeRemovedFromIsr() throws Exception {
// put the replica to online
long tableId = 1;
+ coordinatorContext.putTableInfo(
+ TableInfo.of(
+ DATA1_TABLE_PATH,
+ tableId,
+ 0,
+ DATA1_TABLE_DESCRIPTOR,
+ System.currentTimeMillis(),
+ System.currentTimeMillis()));
+ coordinatorContext.putTablePath(tableId, DATA1_TABLE_PATH);
TableBucket tableBucket = new TableBucket(tableId, 0);
for (int i = 0; i < 3; i++) {
TableBucketReplica replica = new TableBucketReplica(tableBucket, i);
@@ -200,6 +212,7 @@ void testOfflineReplicaShouldBeRemovedFromIsr() throws Exception {
// put leader and isr
LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 0, Arrays.asList(0, 1, 2), 0, 0);
zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
+ coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2));
coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr);
// set replica 1 to offline
@@ -242,7 +255,8 @@ private ReplicaStateMachine createReplicaStateMachine(CoordinatorContext coordin
TestingClientMetricGroup.newInstance())),
(event) -> {
// do nothing
- }),
+ },
+ coordinatorContext),
zookeeperClient);
}
@@ -272,7 +286,8 @@ private ReplicaStateMachine createReplicaStateMachine(
deleteReplicaResultForBucket.succeeded());
}
}
- }),
+ },
+ coordinatorContext),
zookeeperClient);
}
diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
index aae4e1e645..b741139ec5 100644
--- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
+++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
@@ -19,6 +19,7 @@
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableInfo;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.rpc.RpcClient;
import com.alibaba.fluss.rpc.metrics.TestingClientMetricGroup;
@@ -32,8 +33,7 @@
import com.alibaba.fluss.server.coordinator.MetadataManager;
import com.alibaba.fluss.server.coordinator.TestCoordinatorChannelManager;
import com.alibaba.fluss.server.coordinator.event.CoordinatorEventManager;
-import com.alibaba.fluss.server.metadata.ServerMetadataCache;
-import com.alibaba.fluss.server.metadata.ServerMetadataCacheImpl;
+import com.alibaba.fluss.server.metadata.CoordinatorMetadataCache;
import com.alibaba.fluss.server.metrics.group.TestingMetricGroups;
import com.alibaba.fluss.server.zk.NOPErrorHandler;
import com.alibaba.fluss.server.zk.ZooKeeperClient;
@@ -55,6 +55,7 @@
import java.util.Optional;
import java.util.concurrent.Executors;
+import static com.alibaba.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.NewBucket;
import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.NonExistentBucket;
import static com.alibaba.fluss.server.coordinator.statemachine.BucketState.OfflineBucket;
@@ -71,11 +72,11 @@ class TableBucketStateMachineTest {
private static ZooKeeperClient zookeeperClient;
private static CoordinatorContext coordinatorContext;
- private ServerMetadataCache serverMetadataCache;
private TestCoordinatorChannelManager testCoordinatorChannelManager;
private CoordinatorRequestBatch coordinatorRequestBatch;
private AutoPartitionManager autoPartitionManager;
private LakeTableTieringManager lakeTableTieringManager;
+ private CoordinatorMetadataCache serverMetadataCache;
@BeforeAll
static void baseBeforeAll() {
@@ -97,8 +98,9 @@ void beforeEach() throws IOException {
testCoordinatorChannelManager,
event -> {
// do nothing
- });
- serverMetadataCache = new ServerMetadataCacheImpl();
+ },
+ coordinatorContext);
+ serverMetadataCache = new CoordinatorMetadataCache();
autoPartitionManager =
new AutoPartitionManager(
serverMetadataCache,
@@ -183,6 +185,14 @@ void testStateChangeToOnline() throws Exception {
// init a table bucket assignment to coordinator context
long tableId = 4;
TableBucket tableBucket = new TableBucket(tableId, 0);
+ coordinatorContext.putTableInfo(
+ TableInfo.of(
+ fakeTablePath,
+ tableId,
+ 0,
+ DATA1_TABLE_DESCRIPTOR,
+ System.currentTimeMillis(),
+ System.currentTimeMillis()));
coordinatorContext.putTablePath(tableId, fakeTablePath);
coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2));
coordinatorContext.putBucketState(tableBucket, NewBucket);
@@ -247,7 +257,8 @@ void testStateChangeToOnline() throws Exception {
new CoordinatorEventManager(
coordinatorEventProcessor, TestingMetricGroups.COORDINATOR_METRICS);
coordinatorRequestBatch =
- new CoordinatorRequestBatch(testCoordinatorChannelManager, eventManager);
+ new CoordinatorRequestBatch(
+ testCoordinatorChannelManager, eventManager, coordinatorContext);
tableBucketStateMachine =
new TableBucketStateMachine(
coordinatorContext, coordinatorRequestBatch, zookeeperClient);
diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/metadata/ServerMetadataCacheImplTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/metadata/CoordinatorMetadataCacheTest.java
similarity index 80%
rename from fluss-server/src/test/java/com/alibaba/fluss/server/metadata/ServerMetadataCacheImplTest.java
rename to fluss-server/src/test/java/com/alibaba/fluss/server/metadata/CoordinatorMetadataCacheTest.java
index f4e5015953..be76d70267 100644
--- a/fluss-server/src/test/java/com/alibaba/fluss/server/metadata/ServerMetadataCacheImplTest.java
+++ b/fluss-server/src/test/java/com/alibaba/fluss/server/metadata/CoordinatorMetadataCacheTest.java
@@ -18,33 +18,36 @@
import com.alibaba.fluss.cluster.Endpoint;
import com.alibaba.fluss.cluster.ServerType;
+import com.alibaba.fluss.cluster.TabletServerInfo;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashSet;
-import java.util.Optional;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for {@link com.alibaba.fluss.server.metadata.ServerMetadataCacheImpl}. */
-public class ServerMetadataCacheImplTest {
- private ServerMetadataCache serverMetadataCache;
+/** Test for {@link CoordinatorMetadataCache}. */
+public class CoordinatorMetadataCacheTest {
+ private CoordinatorMetadataCache serverMetadataCache;
+
private ServerInfo coordinatorServer;
private Set