Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.fluss.cluster.Cluster;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.rpc.RpcClient;
import org.apache.fluss.server.testutils.FlussClusterExtension;
Expand Down Expand Up @@ -51,6 +52,7 @@ void testRebuildClusterNTimes() throws Exception {
Admin admin = conn.getAdmin();
TablePath tablePath = TablePath.of("fluss", "test");
admin.createTable(tablePath, DATA1_TABLE_DESCRIPTOR, true).get();
TableInfo tableInfo = admin.getTableInfo(tablePath).get();
admin.close();
conn.close();

Expand All @@ -59,6 +61,7 @@ void testRebuildClusterNTimes() throws Exception {
// update metadata
metadataUpdater.updateMetadata(Collections.singleton(tablePath), null, null);
Cluster cluster = metadataUpdater.getCluster();
assertThat(cluster.getTable(tablePath).get()).isEqualTo(tableInfo);

// repeat 20K times to reproduce StackOverflowError if there is
// any N levels UnmodifiableCollection
Expand Down
14 changes: 14 additions & 0 deletions fluss-rpc/src/main/proto/FlussApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ message UpdateMetadataRequest {
repeated PbTableMetadata table_metadata = 3;
repeated PbPartitionMetadata partition_metadata = 4;
optional int32 coordinator_epoch = 5;
repeated PbTableMetadataV2 table_metadata_v2 = 6;
}

message UpdateMetadataResponse {
Expand Down Expand Up @@ -575,6 +576,19 @@ message PbTableMetadata {
// trace by: https://github.com/apache/fluss/issues/981
}

message PbTableMetadataV2 {
required PbTablePath table_path = 1;
required int64 table_id = 2;
required int32 schema_id = 3;
optional bytes table_json = 4;
repeated PbBucketMetadata bucket_metadata = 5;
required int64 created_time = 6;
required int64 modified_time = 7;

// TODO add a new filed 'deleted_table' to indicate this table is deleted in UpdateMetadataRequest.
// trace by: https://github.com/alibaba/fluss/issues/981
}

message PbPartitionMetadata {
required int64 table_id = 1;
// the partition name and id for the partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,14 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() {
(tableId, bucketMetadataList) -> {
TableInfo tableInfo = getTableInfo(tableId);
if (tableInfo != null) {
tableMetadataList.add(new TableMetadata(tableInfo, bucketMetadataList));
tableMetadataList.add(
new TableMetadata(
tableInfo.getTableId(),
tableInfo.getSchemaId(),
tableInfo.getCreatedTime(),
tableInfo.getModifiedTime(),
tableInfo.getTablePath(),
bucketMetadataList));
}
});

Expand Down Expand Up @@ -662,7 +669,13 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() {
TableInfo tableInfo = getTableInfo(tableId);
if (tableInfo != null) {
tableMetadataList.add(
new TableMetadata(getTableInfo(tableId), Collections.emptyList()));
new TableMetadata(
tableInfo.getTableId(),
tableInfo.getSchemaId(),
tableInfo.getCreatedTime(),
tableInfo.getModifiedTime(),
tableInfo.getTablePath(),
Collections.emptyList()));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Objects;

Expand All @@ -40,7 +42,18 @@ public class TableMetadata {
*/
public static final Long DELETED_TABLE_ID = -2L;

private final TableInfo tableInfo;
private final long tableId;

private final int schemaId;

private final long createdTime;

private final long modifiedTime;

private final TablePath tablePath;

/** Will only be set for {@link org.apache.fluss.rpc.messages.MetadataResponse}. */
private final @Nullable TableInfo tableInfo;

/**
* For partition table, this list is always empty. The detail partition metadata is stored in
Expand All @@ -53,10 +66,64 @@ public class TableMetadata {
private final List<BucketMetadata> bucketMetadataList;

public TableMetadata(TableInfo tableInfo, List<BucketMetadata> bucketMetadataList) {
this.tableInfo = tableInfo;
this(
tableInfo.getTableId(),
tableInfo.getSchemaId(),
tableInfo.getCreatedTime(),
tableInfo.getModifiedTime(),
tableInfo.getTablePath(),
bucketMetadataList,
tableInfo);
}

public TableMetadata(
long tableId,
int schemaId,
long createdTime,
long modifiedTime,
TablePath tablePath,
List<BucketMetadata> bucketMetadataList) {
this(tableId, schemaId, createdTime, modifiedTime, tablePath, bucketMetadataList, null);
}

public TableMetadata(
long tableId,
int schemaId,
long createdTime,
long modifiedTime,
TablePath tablePath,
List<BucketMetadata> bucketMetadataList,
TableInfo tableInfo) {
this.tableId = tableId;
this.schemaId = schemaId;
this.createdTime = createdTime;
this.modifiedTime = modifiedTime;
this.tablePath = tablePath;
this.bucketMetadataList = bucketMetadataList;
this.tableInfo = tableInfo;
}

public long getTableId() {
return tableId;
}

public int getSchemaId() {
return schemaId;
}

public long getCreatedTime() {
return createdTime;
}

public long getModifiedTime() {
return modifiedTime;
}

public TablePath getTablePath() {
return tablePath;
}

@Nullable
public TableInfo getTableInfo() {
return tableInfo;
}
Expand All @@ -68,7 +135,17 @@ public List<BucketMetadata> getBucketMetadataList() {
@Override
public String toString() {
return "TableMetadata{"
+ "tableInfo="
+ "tableId="
+ tableId
+ ", schemaId="
+ schemaId
+ ", createdTime="
+ createdTime
+ ", modifiedTime="
+ modifiedTime
+ ", tablePath="
+ tablePath
+ ", tableInfo="
+ tableInfo
+ ", bucketMetadataList="
+ bucketMetadataList
Expand All @@ -77,21 +154,28 @@ public String toString() {

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TableMetadata that = (TableMetadata) o;
if (!tableInfo.equals(that.tableInfo)) {
return false;
}
return bucketMetadataList.equals(that.bucketMetadataList);
return tableId == that.tableId
&& schemaId == that.schemaId
&& createdTime == that.createdTime
&& modifiedTime == that.modifiedTime
&& Objects.equals(tablePath, that.tablePath)
&& Objects.equals(tableInfo, that.tableInfo)
&& Objects.equals(bucketMetadataList, that.bucketMetadataList);
}

@Override
public int hashCode() {
return Objects.hash(tableInfo, bucketMetadataList);
return Objects.hash(
tableId,
schemaId,
createdTime,
modifiedTime,
tablePath,
tableInfo,
bucketMetadataList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ public Optional<PhysicalTablePath> getPhysicalTablePath(long partitionId) {
return serverMetadataSnapshot.getPhysicalTablePath(partitionId);
}

public Map<Integer, BucketMetadata> getBucketMetadataForTable(long tableId) {
return serverMetadataSnapshot.getBucketMetadataForTable(tableId);
}

public TableMetadata getTableMetadata(TablePath tablePath) {
// always get table info from zk.
TableInfo tableInfo = metadataManager.getTable(tablePath);
Expand Down Expand Up @@ -187,9 +191,8 @@ public void updateClusterMetadata(ClusterMetadata clusterMetadata) {
new HashMap<>(serverMetadataSnapshot.getBucketMetadataMapForTables());

for (TableMetadata tableMetadata : clusterMetadata.getTableMetadataList()) {
TableInfo tableInfo = tableMetadata.getTableInfo();
TablePath tablePath = tableInfo.getTablePath();
long tableId = tableInfo.getTableId();
TablePath tablePath = tableMetadata.getTablePath();
long tableId = tableMetadata.getTableId();
if (tableId == DELETED_TABLE_ID) {
Long removedTableId = tableIdByPath.remove(tablePath);
if (removedTableId != null) {
Expand Down
Loading