Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions fluss-rpc/src/main/proto/FlussApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -569,9 +569,7 @@ message PbTableMetadata {
repeated PbBucketMetadata bucket_metadata = 5;
required int64 created_time = 6;
required int64 modified_time = 7;

// TODO add a new filed 'deleted_table' to indicate this table is deleted in UpdateMetadataRequest.
// trace by: https://github.com/alibaba/fluss/issues/981
required bool deleted_marker = 8;
}

message PbPartitionMetadata {
Expand All @@ -580,6 +578,7 @@ message PbPartitionMetadata {
required string partition_name = 2;
required int64 partition_id = 3;
repeated PbBucketMetadata bucket_metadata = 4;
required bool deleted_marker = 5;
}

message PbBucketMetadata {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.alibaba.fluss.server.metadata.TableMetadata;
import com.alibaba.fluss.server.zk.data.LakeTableSnapshot;
import com.alibaba.fluss.server.zk.data.LeaderAndIsr;
import com.alibaba.fluss.utils.types.Tuple2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -61,9 +62,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static com.alibaba.fluss.server.metadata.PartitionMetadata.DELETED_PARTITION_ID;
import static com.alibaba.fluss.server.metadata.PartitionMetadata.DELETED_PARTITION_NAME;
import static com.alibaba.fluss.server.metadata.TableMetadata.DELETED_TABLE_ID;
import static com.alibaba.fluss.server.metadata.TableMetadata.DELETED_TABLE_PATH;
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getNotifyLeaderAndIsrResponseData;
import static com.alibaba.fluss.server.utils.ServerRpcMessageUtils.makeNotifyBucketLeaderAndIsr;
Expand Down Expand Up @@ -616,9 +615,12 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() {
List<TableMetadata> tableMetadataList = new ArrayList<>();
updateMetadataRequestBucketMap.forEach(
(tableId, bucketMetadataList) -> {
TableInfo tableInfo = getTableInfo(tableId);
Tuple2<Boolean, TableInfo> tableInfoTuple = getTableInfo(tableId);
TableInfo tableInfo = tableInfoTuple.f1;
if (tableInfo != null) {
tableMetadataList.add(new TableMetadata(tableInfo, bucketMetadataList));
tableMetadataList.add(
new TableMetadata(
tableInfo, bucketMetadataList, tableInfoTuple.f0));
}
});

Expand All @@ -640,7 +642,8 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() {
tableId,
DELETED_PARTITION_NAME,
partitionId,
kvEntry.getValue());
kvEntry.getValue(),
true);
} else {
throw new IllegalStateException(
"Partition name is null for partition " + partitionId);
Expand All @@ -650,19 +653,20 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() {
new PartitionMetadata(
tableId,
partitionName,
partitionQueuedForDeletion
? DELETED_PARTITION_ID
: partitionId,
kvEntry.getValue());
partitionId,
kvEntry.getValue(),
partitionQueuedForDeletion);
}
// table
partitionMetadataList.add(partitionMetadata);
}
// no bucket metadata, use empty metadata list
TableInfo tableInfo = getTableInfo(tableId);
Tuple2<Boolean, TableInfo> tableInfoTuple = getTableInfo(tableId);
TableInfo tableInfo = tableInfoTuple.f1;
if (tableInfo != null) {
tableMetadataList.add(
new TableMetadata(getTableInfo(tableId), Collections.emptyList()));
new TableMetadata(
tableInfo, Collections.emptyList(), tableInfoTuple.f0));
}
});

Expand All @@ -675,32 +679,34 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() {
partitionMetadataList);
}

@Nullable
private TableInfo getTableInfo(long tableId) {
private Tuple2<Boolean, TableInfo> getTableInfo(long tableId) {
TableInfo tableInfo = coordinatorContext.getTableInfoById(tableId);
boolean tableQueuedForDeletion = coordinatorContext.isTableQueuedForDeletion(tableId);
if (tableInfo == null) {
if (tableQueuedForDeletion) {
return TableInfo.of(
DELETED_TABLE_PATH, tableId, 0, EMPTY_TABLE_DESCRIPTOR, -1L, -1L);
tableInfo =
TableInfo.of(
DELETED_TABLE_PATH, tableId, 0, EMPTY_TABLE_DESCRIPTOR, -1L, -1L);
} else {
// it may happen that the table is dropped, but the partition still exists
// when coordinator restarts, it won't consider it as deleted table,
// and will still send partition bucket metadata to tablet server after startup,
// which will fail into this code patch, not throw exception, just return null.
// TODO: FIX ME, it shouldn't come into here
return null;
tableInfo = null;
}
} else {
return tableQueuedForDeletion
? TableInfo.of(
tableInfo.getTablePath(),
DELETED_TABLE_ID,
0,
EMPTY_TABLE_DESCRIPTOR,
-1L,
-1L)
: tableInfo;
tableInfo =
tableQueuedForDeletion
? TableInfo.of(
tableInfo.getTablePath(),
tableId,
0,
EMPTY_TABLE_DESCRIPTOR,
-1L,
-1L)
: tableInfo;
}
return Tuple2.of(tableQueuedForDeletion, tableInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class PartitionMetadata {
* to identify this partition already deleted, but there is an partitionId residual in
* zookeeper. In this case, tabletServers need to clear the metadata of this partition.
*/
public static final String DELETED_PARTITION_NAME = "__delete__";
public static final String DELETED_PARTITION_NAME = "__delete_marker__";

/**
* The already delete partition id. This partition id will be used in UpdateMetadata request to
Expand All @@ -40,16 +40,27 @@ public class PartitionMetadata {
private final String partitionName;
private final long partitionId;
private final List<BucketMetadata> bucketMetadataList;
private final boolean deletedMarker;

public PartitionMetadata(
long tableId,
String partitionName,
long partitionId,
List<BucketMetadata> bucketMetadataList) {
this(tableId, partitionName, partitionId, bucketMetadataList, false);
}

public PartitionMetadata(
long tableId,
String partitionName,
long partitionId,
List<BucketMetadata> bucketMetadataList,
boolean deletedMarker) {
this.tableId = tableId;
this.partitionName = partitionName;
this.partitionId = partitionId;
this.bucketMetadataList = bucketMetadataList;
this.deletedMarker = deletedMarker;
}

public long getTableId() {
Expand All @@ -67,4 +78,8 @@ public long getPartitionId() {
public List<BucketMetadata> getBucketMetadataList() {
return bucketMetadataList;
}

public boolean isDeletedMarker() {
return deletedMarker;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public class TableMetadata {
* identify this tablePath already deleted, but there is an tableId residual in zookeeper. In
* this case, tabletServers need to clear the metadata of this tableId.
*/
public static final TablePath DELETED_TABLE_PATH = TablePath.of("__UNKNOWN__", "__delete__");
public static final TablePath DELETED_TABLE_PATH =
TablePath.of("__UNKNOWN__", "__delete_marker__");

/**
* The already deleted table id. This table id will be used in UpdateMetadata request to
Expand All @@ -42,6 +43,8 @@ public class TableMetadata {

private final TableInfo tableInfo;

private final boolean deletedMarker;

/**
* For partition table, this list is always empty. The detail partition metadata is stored in
* {@link PartitionMetadata}. By doing this, we can avoid to repeat send tableInfo when
Expand All @@ -53,8 +56,14 @@ public class TableMetadata {
private final List<BucketMetadata> bucketMetadataList;

public TableMetadata(TableInfo tableInfo, List<BucketMetadata> bucketMetadataList) {
this(tableInfo, bucketMetadataList, false);
}

public TableMetadata(
TableInfo tableInfo, List<BucketMetadata> bucketMetadataList, boolean deletedMarker) {
this.tableInfo = tableInfo;
this.bucketMetadataList = bucketMetadataList;
this.deletedMarker = deletedMarker;
}

public TableInfo getTableInfo() {
Expand All @@ -65,6 +74,10 @@ public List<BucketMetadata> getBucketMetadataList() {
return bucketMetadataList;
}

public boolean isDeletedMarker() {
return deletedMarker;
}

@Override
public String toString() {
return "TableMetadata{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@

import static com.alibaba.fluss.server.RpcServiceBase.getPartitionMetadataFromZk;
import static com.alibaba.fluss.server.RpcServiceBase.getTableMetadataFromZk;
import static com.alibaba.fluss.server.metadata.PartitionMetadata.DELETED_PARTITION_ID;
import static com.alibaba.fluss.server.metadata.PartitionMetadata.DELETED_PARTITION_NAME;
import static com.alibaba.fluss.server.metadata.TableMetadata.DELETED_TABLE_ID;
import static com.alibaba.fluss.server.metadata.TableMetadata.DELETED_TABLE_PATH;
import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;

/** The implement of {@link ServerMetadataCache} for {@link TabletServer}. */
Expand Down Expand Up @@ -185,12 +181,7 @@ public void updateClusterMetadata(ClusterMetadata clusterMetadata) {
TableInfo tableInfo = tableMetadata.getTableInfo();
TablePath tablePath = tableInfo.getTablePath();
long tableId = tableInfo.getTableId();
if (tableId == DELETED_TABLE_ID) {
Long removedTableId = tableIdByPath.remove(tablePath);
if (removedTableId != null) {
bucketMetadataMapForTables.remove(removedTableId);
}
} else if (tablePath == DELETED_TABLE_PATH) {
if (tableMetadata.isDeletedMarker()) {
serverMetadataSnapshot
.getTablePath(tableId)
.ifPresent(tableIdByPath::remove);
Expand Down Expand Up @@ -229,12 +220,8 @@ public void updateClusterMetadata(ClusterMetadata clusterMetadata) {
PhysicalTablePath physicalTablePath =
PhysicalTablePath.of(tablePath, partitionName);
long partitionId = partitionMetadata.getPartitionId();
if (partitionId == DELETED_PARTITION_ID) {
Long removedPartitionId = partitionIdByPath.remove(physicalTablePath);
if (removedPartitionId != null) {
bucketMetadataMapForPartitions.remove(removedPartitionId);
}
} else if (partitionName.equals(DELETED_PARTITION_NAME)) {
// TODO partitionMetadata.isDeletedMarker()
if (partitionMetadata.isDeletedMarker()) {
serverMetadataSnapshot
.getPhysicalTablePath(partitionId)
.ifPresent(partitionIdByPath::remove);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@
import com.alibaba.fluss.server.zk.data.LakeTableSnapshot;
import com.alibaba.fluss.server.zk.data.LeaderAndIsr;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -183,6 +186,8 @@
*/
public class ServerRpcMessageUtils {

private static final Logger log = LoggerFactory.getLogger(ServerRpcMessageUtils.class);

public static TablePath toTablePath(PbTablePath pbTablePath) {
return new TablePath(pbTablePath.getDatabaseName(), pbTablePath.getTableName());
}
Expand Down Expand Up @@ -387,6 +392,7 @@ private static PbTableMetadata toPbTableMetadata(TableMetadata tableMetadata) {
.setTablePath()
.setDatabaseName(tablePath.getDatabaseName())
.setTableName(tablePath.getTableName());
pbTableMetadata.setDeletedMarker(tableMetadata.isDeletedMarker());
pbTableMetadata.addAllBucketMetadatas(
toPbBucketMetadata(tableMetadata.getBucketMetadataList()));
return pbTableMetadata;
Expand All @@ -398,6 +404,7 @@ private static PbPartitionMetadata toPbPartitionMetadata(PartitionMetadata parti
.setTableId(partitionMetadata.getTableId())
.setPartitionId(partitionMetadata.getPartitionId())
.setPartitionName(partitionMetadata.getPartitionName());
pbPartitionMetadata.setDeletedMarker(partitionMetadata.isDeletedMarker());
pbPartitionMetadata.addAllBucketMetadatas(
toPbBucketMetadata(partitionMetadata.getBucketMetadataList()));
return pbPartitionMetadata;
Expand Down Expand Up @@ -446,7 +453,7 @@ private static TableMetadata toTableMetaData(PbTableMetadata pbTableMetadata) {
bucketMetadata.add(toBucketMetadata(pbBucketMetadata));
}

return new TableMetadata(tableInfo, bucketMetadata);
return new TableMetadata(tableInfo, bucketMetadata, pbTableMetadata.isDeletedMarker());
}

private static BucketMetadata toBucketMetadata(PbBucketMetadata pbBucketMetadata) {
Expand All @@ -466,7 +473,8 @@ private static PartitionMetadata toPartitionMetadata(PbPartitionMetadata pbParti
pbPartitionMetadata.getPartitionId(),
pbPartitionMetadata.getBucketMetadatasList().stream()
.map(ServerRpcMessageUtils::toBucketMetadata)
.collect(Collectors.toList()));
.collect(Collectors.toList()),
pbPartitionMetadata.isDeletedMarker());
}

public static NotifyLeaderAndIsrRequest makeNotifyLeaderAndIsrRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_INFO;
import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH;
import static com.alibaba.fluss.server.metadata.PartitionMetadata.DELETED_PARTITION_ID;
import static com.alibaba.fluss.server.metadata.TableMetadata.DELETED_TABLE_ID;
import static com.alibaba.fluss.server.zk.data.LeaderAndIsr.NO_LEADER;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -212,13 +210,13 @@ void testUpdateClusterMetadataRequest() {
new TableMetadata(
TableInfo.of(
DATA1_TABLE_PATH,
DELETED_TABLE_ID, // mark this table as
// deletion.
DATA1_TABLE_ID,
1,
DATA1_TABLE_DESCRIPTOR,
System.currentTimeMillis(),
System.currentTimeMillis()),
changedBucket1BucketMetadata)),
changedBucket1BucketMetadata,
true)),
Collections.emptyList()));
assertThat(serverMetadataCache.getTablePath(DATA1_TABLE_ID)).isEmpty();

Expand All @@ -232,9 +230,9 @@ void testUpdateClusterMetadataRequest() {
new PartitionMetadata(
partitionTableId,
partitionName1,
DELETED_PARTITION_ID, // mark this partition as
// deletion.
Collections.emptyList()))));
partitionId1,
Collections.emptyList(),
true))));
assertThat(serverMetadataCache.getPhysicalTablePath(partitionId1)).isEmpty();
assertPartitionMetadataEquals(
partitionId2,
Expand Down
Loading