diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 246dbe89e3..387589d7a6 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -569,9 +569,7 @@ message PbTableMetadata { repeated PbBucketMetadata bucket_metadata = 5; required int64 created_time = 6; required int64 modified_time = 7; - - // TODO add a new filed 'deleted_table' to indicate this table is deleted in UpdateMetadataRequest. - // trace by: https://github.com/alibaba/fluss/issues/981 + required bool deleted_marker = 8; } message PbPartitionMetadata { @@ -580,6 +578,7 @@ message PbPartitionMetadata { required string partition_name = 2; required int64 partition_id = 3; repeated PbBucketMetadata bucket_metadata = 4; + required bool deleted_marker = 5; } message PbBucketMetadata { diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorRequestBatch.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorRequestBatch.java index 1595f55a6c..397428f31d 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorRequestBatch.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorRequestBatch.java @@ -45,6 +45,7 @@ import com.alibaba.fluss.server.metadata.TableMetadata; import com.alibaba.fluss.server.zk.data.LakeTableSnapshot; import com.alibaba.fluss.server.zk.data.LeaderAndIsr; +import com.alibaba.fluss.utils.types.Tuple2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,9 +62,7 @@ import java.util.Set; import java.util.stream.Collectors; -import static com.alibaba.fluss.server.metadata.PartitionMetadata.DELETED_PARTITION_ID; import static com.alibaba.fluss.server.metadata.PartitionMetadata.DELETED_PARTITION_NAME; -import static com.alibaba.fluss.server.metadata.TableMetadata.DELETED_TABLE_ID; import static com.alibaba.fluss.server.metadata.TableMetadata.DELETED_TABLE_PATH; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getNotifyLeaderAndIsrResponseData; import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeNotifyBucketLeaderAndIsr; @@ -616,9 +615,12 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() { List tableMetadataList = new ArrayList<>(); updateMetadataRequestBucketMap.forEach( (tableId, bucketMetadataList) -> { - TableInfo tableInfo = getTableInfo(tableId); + Tuple2 tableInfoTuple = getTableInfo(tableId); + TableInfo tableInfo = tableInfoTuple.f1; if (tableInfo != null) { - tableMetadataList.add(new TableMetadata(tableInfo, bucketMetadataList)); + tableMetadataList.add( + new TableMetadata( + tableInfo, bucketMetadataList, tableInfoTuple.f0)); } }); @@ -640,7 +642,8 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() { tableId, DELETED_PARTITION_NAME, partitionId, - kvEntry.getValue()); + kvEntry.getValue(), + true); } else { throw new IllegalStateException( "Partition name is null for partition " + partitionId); @@ -650,19 +653,20 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() { new PartitionMetadata( tableId, partitionName, - partitionQueuedForDeletion - ? DELETED_PARTITION_ID - : partitionId, - kvEntry.getValue()); + partitionId, + kvEntry.getValue(), + partitionQueuedForDeletion); } // table partitionMetadataList.add(partitionMetadata); } // no bucket metadata, use empty metadata list - TableInfo tableInfo = getTableInfo(tableId); + Tuple2 tableInfoTuple = getTableInfo(tableId); + TableInfo tableInfo = tableInfoTuple.f1; if (tableInfo != null) { tableMetadataList.add( - new TableMetadata(getTableInfo(tableId), Collections.emptyList())); + new TableMetadata( + tableInfo, Collections.emptyList(), tableInfoTuple.f0)); } }); @@ -675,32 +679,34 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() { partitionMetadataList); } - @Nullable - private TableInfo getTableInfo(long tableId) { + private Tuple2 getTableInfo(long tableId) { TableInfo tableInfo = coordinatorContext.getTableInfoById(tableId); boolean tableQueuedForDeletion = coordinatorContext.isTableQueuedForDeletion(tableId); if (tableInfo == null) { if (tableQueuedForDeletion) { - return TableInfo.of( - DELETED_TABLE_PATH, tableId, 0, EMPTY_TABLE_DESCRIPTOR, -1L, -1L); + tableInfo = + TableInfo.of( + DELETED_TABLE_PATH, tableId, 0, EMPTY_TABLE_DESCRIPTOR, -1L, -1L); } else { // it may happen that the table is dropped, but the partition still exists // when coordinator restarts, it won't consider it as deleted table, // and will still send partition bucket metadata to tablet server after startup, // which will fail into this code patch, not throw exception, just return null. // TODO: FIX ME, it shouldn't come into here - return null; + tableInfo = null; } } else { - return tableQueuedForDeletion - ? TableInfo.of( - tableInfo.getTablePath(), - DELETED_TABLE_ID, - 0, - EMPTY_TABLE_DESCRIPTOR, - -1L, - -1L) - : tableInfo; + tableInfo = + tableQueuedForDeletion + ? TableInfo.of( + tableInfo.getTablePath(), + tableId, + 0, + EMPTY_TABLE_DESCRIPTOR, + -1L, + -1L) + : tableInfo; } + return Tuple2.of(tableQueuedForDeletion, tableInfo); } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/PartitionMetadata.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/PartitionMetadata.java index 20778f1100..852b5770a6 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/PartitionMetadata.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/PartitionMetadata.java @@ -27,7 +27,7 @@ public class PartitionMetadata { * to identify this partition already deleted, but there is an partitionId residual in * zookeeper. In this case, tabletServers need to clear the metadata of this partition. */ - public static final String DELETED_PARTITION_NAME = "__delete__"; + public static final String DELETED_PARTITION_NAME = "__delete_marker__"; /** * The already delete partition id. This partition id will be used in UpdateMetadata request to @@ -40,16 +40,27 @@ public class PartitionMetadata { private final String partitionName; private final long partitionId; private final List bucketMetadataList; + private final boolean deletedMarker; public PartitionMetadata( long tableId, String partitionName, long partitionId, List bucketMetadataList) { + this(tableId, partitionName, partitionId, bucketMetadataList, false); + } + + public PartitionMetadata( + long tableId, + String partitionName, + long partitionId, + List bucketMetadataList, + boolean deletedMarker) { this.tableId = tableId; this.partitionName = partitionName; this.partitionId = partitionId; this.bucketMetadataList = bucketMetadataList; + this.deletedMarker = deletedMarker; } public long getTableId() { @@ -67,4 +78,8 @@ public long getPartitionId() { public List getBucketMetadataList() { return bucketMetadataList; } + + public boolean isDeletedMarker() { + return deletedMarker; + } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TableMetadata.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TableMetadata.java index fe8771de37..df3fb071d8 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TableMetadata.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TableMetadata.java @@ -31,7 +31,8 @@ public class TableMetadata { * identify this tablePath already deleted, but there is an tableId residual in zookeeper. In * this case, tabletServers need to clear the metadata of this tableId. */ - public static final TablePath DELETED_TABLE_PATH = TablePath.of("__UNKNOWN__", "__delete__"); + public static final TablePath DELETED_TABLE_PATH = + TablePath.of("__UNKNOWN__", "__delete_marker__"); /** * The already deleted table id. This table id will be used in UpdateMetadata request to @@ -42,6 +43,8 @@ public class TableMetadata { private final TableInfo tableInfo; + private final boolean deletedMarker; + /** * For partition table, this list is always empty. The detail partition metadata is stored in * {@link PartitionMetadata}. By doing this, we can avoid to repeat send tableInfo when @@ -53,8 +56,14 @@ public class TableMetadata { private final List bucketMetadataList; public TableMetadata(TableInfo tableInfo, List bucketMetadataList) { + this(tableInfo, bucketMetadataList, false); + } + + public TableMetadata( + TableInfo tableInfo, List bucketMetadataList, boolean deletedMarker) { this.tableInfo = tableInfo; this.bucketMetadataList = bucketMetadataList; + this.deletedMarker = deletedMarker; } public TableInfo getTableInfo() { @@ -65,6 +74,10 @@ public List getBucketMetadataList() { return bucketMetadataList; } + public boolean isDeletedMarker() { + return deletedMarker; + } + @Override public String toString() { return "TableMetadata{" diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TabletServerMetadataCache.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TabletServerMetadataCache.java index 7607a0d5b7..37bfb3a0d4 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TabletServerMetadataCache.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metadata/TabletServerMetadataCache.java @@ -43,10 +43,6 @@ import static com.alibaba.fluss.server.RpcServiceBase.getPartitionMetadataFromZk; import static com.alibaba.fluss.server.RpcServiceBase.getTableMetadataFromZk; -import static com.alibaba.fluss.server.metadata.PartitionMetadata.DELETED_PARTITION_ID; -import static com.alibaba.fluss.server.metadata.PartitionMetadata.DELETED_PARTITION_NAME; -import static com.alibaba.fluss.server.metadata.TableMetadata.DELETED_TABLE_ID; -import static com.alibaba.fluss.server.metadata.TableMetadata.DELETED_TABLE_PATH; import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock; /** The implement of {@link ServerMetadataCache} for {@link TabletServer}. */ @@ -185,12 +181,7 @@ public void updateClusterMetadata(ClusterMetadata clusterMetadata) { TableInfo tableInfo = tableMetadata.getTableInfo(); TablePath tablePath = tableInfo.getTablePath(); long tableId = tableInfo.getTableId(); - if (tableId == DELETED_TABLE_ID) { - Long removedTableId = tableIdByPath.remove(tablePath); - if (removedTableId != null) { - bucketMetadataMapForTables.remove(removedTableId); - } - } else if (tablePath == DELETED_TABLE_PATH) { + if (tableMetadata.isDeletedMarker()) { serverMetadataSnapshot .getTablePath(tableId) .ifPresent(tableIdByPath::remove); @@ -229,12 +220,8 @@ public void updateClusterMetadata(ClusterMetadata clusterMetadata) { PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName); long partitionId = partitionMetadata.getPartitionId(); - if (partitionId == DELETED_PARTITION_ID) { - Long removedPartitionId = partitionIdByPath.remove(physicalTablePath); - if (removedPartitionId != null) { - bucketMetadataMapForPartitions.remove(removedPartitionId); - } - } else if (partitionName.equals(DELETED_PARTITION_NAME)) { + // TODO partitionMetadata.isDeletedMarker() + if (partitionMetadata.isDeletedMarker()) { serverMetadataSnapshot .getPhysicalTablePath(partitionId) .ifPresent(partitionIdByPath::remove); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java index 034163493c..73600bc858 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java @@ -156,6 +156,9 @@ import com.alibaba.fluss.server.zk.data.LakeTableSnapshot; import com.alibaba.fluss.server.zk.data.LeaderAndIsr; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -183,6 +186,8 @@ */ public class ServerRpcMessageUtils { + private static final Logger log = LoggerFactory.getLogger(ServerRpcMessageUtils.class); + public static TablePath toTablePath(PbTablePath pbTablePath) { return new TablePath(pbTablePath.getDatabaseName(), pbTablePath.getTableName()); } @@ -387,6 +392,7 @@ private static PbTableMetadata toPbTableMetadata(TableMetadata tableMetadata) { .setTablePath() .setDatabaseName(tablePath.getDatabaseName()) .setTableName(tablePath.getTableName()); + pbTableMetadata.setDeletedMarker(tableMetadata.isDeletedMarker()); pbTableMetadata.addAllBucketMetadatas( toPbBucketMetadata(tableMetadata.getBucketMetadataList())); return pbTableMetadata; @@ -398,6 +404,7 @@ private static PbPartitionMetadata toPbPartitionMetadata(PartitionMetadata parti .setTableId(partitionMetadata.getTableId()) .setPartitionId(partitionMetadata.getPartitionId()) .setPartitionName(partitionMetadata.getPartitionName()); + pbPartitionMetadata.setDeletedMarker(partitionMetadata.isDeletedMarker()); pbPartitionMetadata.addAllBucketMetadatas( toPbBucketMetadata(partitionMetadata.getBucketMetadataList())); return pbPartitionMetadata; @@ -446,7 +453,7 @@ private static TableMetadata toTableMetaData(PbTableMetadata pbTableMetadata) { bucketMetadata.add(toBucketMetadata(pbBucketMetadata)); } - return new TableMetadata(tableInfo, bucketMetadata); + return new TableMetadata(tableInfo, bucketMetadata, pbTableMetadata.isDeletedMarker()); } private static BucketMetadata toBucketMetadata(PbBucketMetadata pbBucketMetadata) { @@ -466,7 +473,8 @@ private static PartitionMetadata toPartitionMetadata(PbPartitionMetadata pbParti pbPartitionMetadata.getPartitionId(), pbPartitionMetadata.getBucketMetadatasList().stream() .map(ServerRpcMessageUtils::toBucketMetadata) - .collect(Collectors.toList())); + .collect(Collectors.toList()), + pbPartitionMetadata.isDeletedMarker()); } public static NotifyLeaderAndIsrRequest makeNotifyLeaderAndIsrRequest( diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/metadata/TabletServerMetadataCacheTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/metadata/TabletServerMetadataCacheTest.java index 5fddb02a28..d03c8aae78 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/metadata/TabletServerMetadataCacheTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/metadata/TabletServerMetadataCacheTest.java @@ -43,8 +43,6 @@ import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; -import static com.alibaba.fluss.server.metadata.PartitionMetadata.DELETED_PARTITION_ID; -import static com.alibaba.fluss.server.metadata.TableMetadata.DELETED_TABLE_ID; import static com.alibaba.fluss.server.zk.data.LeaderAndIsr.NO_LEADER; import static org.assertj.core.api.Assertions.assertThat; @@ -212,13 +210,13 @@ void testUpdateClusterMetadataRequest() { new TableMetadata( TableInfo.of( DATA1_TABLE_PATH, - DELETED_TABLE_ID, // mark this table as - // deletion. + DATA1_TABLE_ID, 1, DATA1_TABLE_DESCRIPTOR, System.currentTimeMillis(), System.currentTimeMillis()), - changedBucket1BucketMetadata)), + changedBucket1BucketMetadata, + true)), Collections.emptyList())); assertThat(serverMetadataCache.getTablePath(DATA1_TABLE_ID)).isEmpty(); @@ -232,9 +230,9 @@ void testUpdateClusterMetadataRequest() { new PartitionMetadata( partitionTableId, partitionName1, - DELETED_PARTITION_ID, // mark this partition as - // deletion. - Collections.emptyList())))); + partitionId1, + Collections.emptyList(), + true)))); assertThat(serverMetadataCache.getPhysicalTablePath(partitionId1)).isEmpty(); assertPartitionMetadataEquals( partitionId2, diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java index 6dd2ce4f49..37f5325ec6 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java @@ -107,8 +107,6 @@ import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID; import static com.alibaba.fluss.record.TestData.EXPECTED_LOG_RESULTS_FOR_DATA_1_WITH_PK; import static com.alibaba.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH; -import static com.alibaba.fluss.server.metadata.PartitionMetadata.DELETED_PARTITION_ID; -import static com.alibaba.fluss.server.metadata.TableMetadata.DELETED_TABLE_ID; import static com.alibaba.fluss.server.testutils.PartitionMetadataAssert.assertPartitionMetadata; import static com.alibaba.fluss.server.testutils.TableMetadataAssert.assertTableMetadata; import static com.alibaba.fluss.server.zk.data.LeaderAndIsr.INITIAL_BUCKET_EPOCH; @@ -1584,12 +1582,13 @@ void testUpdateMetadata() throws Exception { new TableMetadata( TableInfo.of( nonePartitionTablePath, - DELETED_TABLE_ID, // mark as deleted. + nonePartitionTableId, 1, DATA1_TABLE_DESCRIPTOR, System.currentTimeMillis(), System.currentTimeMillis()), - Collections.emptyList())), + Collections.emptyList(), + true)), Collections.emptyList())); zkClient.deleteTable(nonePartitionTablePath); @@ -1614,8 +1613,9 @@ void testUpdateMetadata() throws Exception { new PartitionMetadata( partitionTableId, partitionName1, - DELETED_PARTITION_ID, // mark as deleted. - Collections.emptyList())))); + partitionId1, + Collections.emptyList(), + true)))); expectedPartitionNameById.put(partitionId1, null); expectedPartitionMetadataById.put(physicalTablePath1, null); assertUpdateMetadataEquals(