diff --git a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java index 514025ae9f..fee514e6fb 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/metadata/MetadataUpdaterTest.java @@ -24,6 +24,7 @@ import org.apache.fluss.cluster.Cluster; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.rpc.RpcClient; import org.apache.fluss.server.testutils.FlussClusterExtension; @@ -51,6 +52,7 @@ void testRebuildClusterNTimes() throws Exception { Admin admin = conn.getAdmin(); TablePath tablePath = TablePath.of("fluss", "test"); admin.createTable(tablePath, DATA1_TABLE_DESCRIPTOR, true).get(); + TableInfo tableInfo = admin.getTableInfo(tablePath).get(); admin.close(); conn.close(); @@ -59,6 +61,7 @@ void testRebuildClusterNTimes() throws Exception { // update metadata metadataUpdater.updateMetadata(Collections.singleton(tablePath), null, null); Cluster cluster = metadataUpdater.getCluster(); + assertThat(cluster.getTable(tablePath).get()).isEqualTo(tableInfo); // repeat 20K times to reproduce StackOverflowError if there is // any N levels UnmodifiableCollection diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 2ddaa44e8a..620377fe48 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -173,6 +173,7 @@ message UpdateMetadataRequest { repeated PbTableMetadata table_metadata = 3; repeated PbPartitionMetadata partition_metadata = 4; optional int32 coordinator_epoch = 5; + repeated PbTableMetadataV2 table_metadata_v2 = 6; } message UpdateMetadataResponse { @@ -575,6 +576,19 @@ message PbTableMetadata { // trace by: https://github.com/apache/fluss/issues/981 } +message PbTableMetadataV2 { + required PbTablePath table_path = 1; + required int64 table_id = 2; + required int32 schema_id = 3; + optional bytes table_json = 4; + repeated PbBucketMetadata bucket_metadata = 5; + required int64 created_time = 6; + required int64 modified_time = 7; + + // TODO add a new filed 'deleted_table' to indicate this table is deleted in UpdateMetadataRequest. + // trace by: https://github.com/alibaba/fluss/issues/981 +} + message PbPartitionMetadata { required int64 table_id = 1; // the partition name and id for the partition diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java index 504b2b4396..45cc07492f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java @@ -618,7 +618,14 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() { (tableId, bucketMetadataList) -> { TableInfo tableInfo = getTableInfo(tableId); if (tableInfo != null) { - tableMetadataList.add(new TableMetadata(tableInfo, bucketMetadataList)); + tableMetadataList.add( + new TableMetadata( + tableInfo.getTableId(), + tableInfo.getSchemaId(), + tableInfo.getCreatedTime(), + tableInfo.getModifiedTime(), + tableInfo.getTablePath(), + bucketMetadataList)); } }); @@ -662,7 +669,13 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() { TableInfo tableInfo = getTableInfo(tableId); if (tableInfo != null) { tableMetadataList.add( - new TableMetadata(getTableInfo(tableId), Collections.emptyList())); + new TableMetadata( + tableInfo.getTableId(), + tableInfo.getSchemaId(), + tableInfo.getCreatedTime(), + tableInfo.getModifiedTime(), + tableInfo.getTablePath(), + Collections.emptyList())); } }); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TableMetadata.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TableMetadata.java index 6f51014d61..28ecd6b5e4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TableMetadata.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TableMetadata.java @@ -20,6 +20,8 @@ import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import javax.annotation.Nullable; + import java.util.List; import java.util.Objects; @@ -40,7 +42,18 @@ public class TableMetadata { */ public static final Long DELETED_TABLE_ID = -2L; - private final TableInfo tableInfo; + private final long tableId; + + private final int schemaId; + + private final long createdTime; + + private final long modifiedTime; + + private final TablePath tablePath; + + /** Will only be set for {@link org.apache.fluss.rpc.messages.MetadataResponse}. */ + private final @Nullable TableInfo tableInfo; /** * For partition table, this list is always empty. The detail partition metadata is stored in @@ -53,10 +66,64 @@ public class TableMetadata { private final List bucketMetadataList; public TableMetadata(TableInfo tableInfo, List bucketMetadataList) { - this.tableInfo = tableInfo; + this( + tableInfo.getTableId(), + tableInfo.getSchemaId(), + tableInfo.getCreatedTime(), + tableInfo.getModifiedTime(), + tableInfo.getTablePath(), + bucketMetadataList, + tableInfo); + } + + public TableMetadata( + long tableId, + int schemaId, + long createdTime, + long modifiedTime, + TablePath tablePath, + List bucketMetadataList) { + this(tableId, schemaId, createdTime, modifiedTime, tablePath, bucketMetadataList, null); + } + + public TableMetadata( + long tableId, + int schemaId, + long createdTime, + long modifiedTime, + TablePath tablePath, + List bucketMetadataList, + TableInfo tableInfo) { + this.tableId = tableId; + this.schemaId = schemaId; + this.createdTime = createdTime; + this.modifiedTime = modifiedTime; + this.tablePath = tablePath; this.bucketMetadataList = bucketMetadataList; + this.tableInfo = tableInfo; } + public long getTableId() { + return tableId; + } + + public int getSchemaId() { + return schemaId; + } + + public long getCreatedTime() { + return createdTime; + } + + public long getModifiedTime() { + return modifiedTime; + } + + public TablePath getTablePath() { + return tablePath; + } + + @Nullable public TableInfo getTableInfo() { return tableInfo; } @@ -68,7 +135,17 @@ public List getBucketMetadataList() { @Override public String toString() { return "TableMetadata{" - + "tableInfo=" + + "tableId=" + + tableId + + ", schemaId=" + + schemaId + + ", createdTime=" + + createdTime + + ", modifiedTime=" + + modifiedTime + + ", tablePath=" + + tablePath + + ", tableInfo=" + tableInfo + ", bucketMetadataList=" + bucketMetadataList @@ -77,21 +154,28 @@ public String toString() { @Override public boolean equals(Object o) { - if (this == o) { - return true; - } if (o == null || getClass() != o.getClass()) { return false; } TableMetadata that = (TableMetadata) o; - if (!tableInfo.equals(that.tableInfo)) { - return false; - } - return bucketMetadataList.equals(that.bucketMetadataList); + return tableId == that.tableId + && schemaId == that.schemaId + && createdTime == that.createdTime + && modifiedTime == that.modifiedTime + && Objects.equals(tablePath, that.tablePath) + && Objects.equals(tableInfo, that.tableInfo) + && Objects.equals(bucketMetadataList, that.bucketMetadataList); } @Override public int hashCode() { - return Objects.hash(tableInfo, bucketMetadataList); + return Objects.hash( + tableId, + schemaId, + createdTime, + modifiedTime, + tablePath, + tableInfo, + bucketMetadataList); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java index ff6855e135..73c703eb78 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java @@ -115,6 +115,10 @@ public Optional getPhysicalTablePath(long partitionId) { return serverMetadataSnapshot.getPhysicalTablePath(partitionId); } + public Map getBucketMetadataForTable(long tableId) { + return serverMetadataSnapshot.getBucketMetadataForTable(tableId); + } + public TableMetadata getTableMetadata(TablePath tablePath) { // always get table info from zk. TableInfo tableInfo = metadataManager.getTable(tablePath); @@ -187,9 +191,8 @@ public void updateClusterMetadata(ClusterMetadata clusterMetadata) { new HashMap<>(serverMetadataSnapshot.getBucketMetadataMapForTables()); for (TableMetadata tableMetadata : clusterMetadata.getTableMetadataList()) { - TableInfo tableInfo = tableMetadata.getTableInfo(); - TablePath tablePath = tableInfo.getTablePath(); - long tableId = tableInfo.getTableId(); + TablePath tablePath = tableMetadata.getTablePath(); + long tableId = tableMetadata.getTableId(); if (tableId == DELETED_TABLE_ID) { Long removedTableId = tableIdByPath.remove(tablePath); if (removedTableId != null) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index f77cf378ed..82c24c2356 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -17,17 +17,18 @@ package org.apache.fluss.server.utils; +import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.cluster.Endpoint; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.fs.FsPath; import org.apache.fluss.fs.token.ObtainedSecurityToken; import org.apache.fluss.metadata.PartitionSpec; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.ResolvedPartitionSpec; import org.apache.fluss.metadata.TableBucket; -import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.record.BytesViewLogRecords; @@ -115,6 +116,7 @@ import org.apache.fluss.rpc.messages.PbStopReplicaRespForBucket; import org.apache.fluss.rpc.messages.PbTableBucket; import org.apache.fluss.rpc.messages.PbTableMetadata; +import org.apache.fluss.rpc.messages.PbTableMetadataV2; import org.apache.fluss.rpc.messages.PbTablePath; import org.apache.fluss.rpc.messages.PbValue; import org.apache.fluss.rpc.messages.PbValueList; @@ -303,15 +305,15 @@ public static UpdateMetadataRequest makeUpdateMetadataRequest( .setPort(coordinatorServer.endpoints().get(0).getPort()); } - List pbTableMetadataList = new ArrayList<>(); + List pbTableMetadataV2List = new ArrayList<>(); tableMetadataList.forEach( - tableMetadata -> pbTableMetadataList.add(toPbTableMetadata(tableMetadata))); + tableMetadata -> pbTableMetadataV2List.add(toPbTableMetadataV2(tableMetadata))); List pbPartitionMetadataList = new ArrayList<>(); partitionMetadataList.forEach( partitionMetadata -> pbPartitionMetadataList.add(toPbPartitionMetadata(partitionMetadata))); - updateMetadataRequest.addAllTableMetadatas(pbTableMetadataList); + updateMetadataRequest.addAllTableMetadataV2s(pbTableMetadataV2List); updateMetadataRequest.addAllPartitionMetadatas(pbPartitionMetadataList); return updateMetadataRequest; @@ -360,8 +362,16 @@ public static ClusterMetadata getUpdateMetadataRequestData(UpdateMetadataRequest } List tableMetadataList = new ArrayList<>(); - request.getTableMetadatasList() - .forEach(tableMetadata -> tableMetadataList.add(toTableMetaData(tableMetadata))); + if (request.getTableMetadataV2sCount() > 0) { + request.getTableMetadataV2sList() + .forEach( + tableMetadata -> tableMetadataList.add(toTableMetaData(tableMetadata))); + } else { + // for backward compatibility for versions <= 0.7 + request.getTableMetadatasList() + .forEach( + tableMetadata -> tableMetadataList.add(toTableMetaData(tableMetadata))); + } List partitionMetadataList = new ArrayList<>(); request.getPartitionMetadatasList() @@ -375,6 +385,11 @@ public static ClusterMetadata getUpdateMetadataRequestData(UpdateMetadataRequest private static PbTableMetadata toPbTableMetadata(TableMetadata tableMetadata) { TableInfo tableInfo = tableMetadata.getTableInfo(); + if (tableInfo == null) { + // We set TableInfo for metadata request, so it should not be null. + throw new FlussRuntimeException( + String.format("TableInfo for %s is null", tableMetadata.getTablePath())); + } PbTableMetadata pbTableMetadata = new PbTableMetadata() .setTableId(tableInfo.getTableId()) @@ -392,6 +407,23 @@ private static PbTableMetadata toPbTableMetadata(TableMetadata tableMetadata) { return pbTableMetadata; } + private static PbTableMetadataV2 toPbTableMetadataV2(TableMetadata tableMetadata) { + PbTableMetadataV2 pbTableMetadata = + new PbTableMetadataV2() + .setTableId(tableMetadata.getTableId()) + .setSchemaId(tableMetadata.getSchemaId()) + .setCreatedTime(tableMetadata.getCreatedTime()) + .setModifiedTime(tableMetadata.getModifiedTime()); + TablePath tablePath = tableMetadata.getTablePath(); + pbTableMetadata + .setTablePath() + .setDatabaseName(tablePath.getDatabaseName()) + .setTableName(tablePath.getTableName()); + pbTableMetadata.addAllBucketMetadatas( + toPbBucketMetadata(tableMetadata.getBucketMetadataList())); + return pbTableMetadata; + } + private static PbPartitionMetadata toPbPartitionMetadata(PartitionMetadata partitionMetadata) { PbPartitionMetadata pbPartitionMetadata = new PbPartitionMetadata() @@ -403,7 +435,8 @@ private static PbPartitionMetadata toPbPartitionMetadata(PartitionMetadata parti return pbPartitionMetadata; } - private static List toPbBucketMetadata( + @VisibleForTesting + public static List toPbBucketMetadata( List bucketMetadataList) { List pbBucketMetadataList = new ArrayList<>(); for (BucketMetadata bucketMetadata : bucketMetadataList) { @@ -430,23 +463,33 @@ private static List toPbBucketMetadata( } private static TableMetadata toTableMetaData(PbTableMetadata pbTableMetadata) { - TablePath tablePath = toTablePath(pbTableMetadata.getTablePath()); - long tableId = pbTableMetadata.getTableId(); - TableInfo tableInfo = - TableInfo.of( - tablePath, - tableId, - pbTableMetadata.getSchemaId(), - TableDescriptor.fromJsonBytes(pbTableMetadata.getTableJson()), - pbTableMetadata.getCreatedTime(), - pbTableMetadata.getModifiedTime()); - List bucketMetadata = new ArrayList<>(); for (PbBucketMetadata pbBucketMetadata : pbTableMetadata.getBucketMetadatasList()) { bucketMetadata.add(toBucketMetadata(pbBucketMetadata)); } - return new TableMetadata(tableInfo, bucketMetadata); + return new TableMetadata( + pbTableMetadata.getTableId(), + pbTableMetadata.getSchemaId(), + pbTableMetadata.getCreatedTime(), + pbTableMetadata.getModifiedTime(), + toTablePath(pbTableMetadata.getTablePath()), + bucketMetadata); + } + + private static TableMetadata toTableMetaData(PbTableMetadataV2 pbTableMetadataV2) { + List bucketMetadata = new ArrayList<>(); + for (PbBucketMetadata pbBucketMetadata : pbTableMetadataV2.getBucketMetadatasList()) { + bucketMetadata.add(toBucketMetadata(pbBucketMetadata)); + } + + return new TableMetadata( + pbTableMetadataV2.getTableId(), + pbTableMetadataV2.getSchemaId(), + pbTableMetadataV2.getCreatedTime(), + pbTableMetadataV2.getModifiedTime(), + toTablePath(pbTableMetadataV2.getTablePath()), + bucketMetadata); } private static BucketMetadata toBucketMetadata(PbBucketMetadata pbBucketMetadata) { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/TabletServerMetadataCacheTest.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/TabletServerMetadataCacheTest.java index 90f012b838..b40a97e278 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/metadata/TabletServerMetadataCacheTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/TabletServerMetadataCacheTest.java @@ -307,6 +307,11 @@ private void assertTableMetadataEquals( List expectedBucketMetadataList) { TablePath tablePath = serverMetadataCache.getTablePath(tableId).get(); TableMetadata tableMetadata = serverMetadataCache.getTableMetadata(tablePath); + assertThat(tableMetadata.getTableId()).isEqualTo(expectedTableInfo.getTableId()); + assertThat(tableMetadata.getSchemaId()).isEqualTo(expectedTableInfo.getSchemaId()); + assertThat(tableMetadata.getCreatedTime()).isEqualTo(expectedTableInfo.getCreatedTime()); + assertThat(tableMetadata.getModifiedTime()).isEqualTo(expectedTableInfo.getModifiedTime()); + assertThat(tableMetadata.getTablePath()).isEqualTo(expectedTableInfo.getTablePath()); assertThat(tableMetadata.getTableInfo()).isEqualTo(expectedTableInfo); assertThat(tableMetadata.getBucketMetadataList()) .hasSameElementsAs(expectedBucketMetadataList); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java index 04f8f3ce38..366b3ad9ac 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java @@ -25,6 +25,7 @@ import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableDescriptor; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.record.DefaultKvRecordBatch; import org.apache.fluss.record.DefaultValueRecordBatch; @@ -35,6 +36,7 @@ import org.apache.fluss.rpc.messages.InitWriterRequest; import org.apache.fluss.rpc.messages.InitWriterResponse; import org.apache.fluss.rpc.messages.ListOffsetsResponse; +import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.NotifyLeaderAndIsrRequest; import org.apache.fluss.rpc.messages.NotifyLeaderAndIsrResponse; import org.apache.fluss.rpc.messages.PbFetchLogRespForBucket; @@ -44,13 +46,18 @@ import org.apache.fluss.rpc.messages.PbNotifyLeaderAndIsrReqForBucket; import org.apache.fluss.rpc.messages.PbPrefixLookupRespForBucket; import org.apache.fluss.rpc.messages.PbPutKvRespForBucket; +import org.apache.fluss.rpc.messages.PbTableMetadata; import org.apache.fluss.rpc.messages.ProduceLogResponse; import org.apache.fluss.rpc.messages.PutKvResponse; +import org.apache.fluss.rpc.messages.UpdateMetadataRequest; import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.server.entity.NotifyLeaderAndIsrData; import org.apache.fluss.server.entity.NotifyLeaderAndIsrResultForBucket; import org.apache.fluss.server.log.ListOffsetsParam; +import org.apache.fluss.server.metadata.BucketMetadata; import org.apache.fluss.server.metadata.ServerInfo; +import org.apache.fluss.server.metadata.TableMetadata; +import org.apache.fluss.server.metadata.TabletServerMetadataCache; import org.apache.fluss.server.testutils.FlussClusterExtension; import org.apache.fluss.server.utils.ServerRpcMessageUtils; import org.apache.fluss.server.zk.data.LeaderAndIsr; @@ -104,6 +111,7 @@ import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getNotifyLeaderAndIsrResponseData; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeNotifyBucketLeaderAndIsr; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.makeUpdateMetadataRequest; +import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toPbBucketMetadata; import static org.apache.fluss.testutils.DataTestUtils.compactedRow; import static org.apache.fluss.testutils.DataTestUtils.genKvRecordBatch; import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject; @@ -890,6 +898,74 @@ void testBecomeLeaderOrFollowerWithOneTabletServerOffline() throws Exception { assertThat(result.get(0).getError().error()).isEqualTo(Errors.NONE); } + @Test + void testUpdateMetadataCapabilities() throws Exception { + long tableId = + createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH, DATA1_TABLE_DESCRIPTOR); + TableBucket tb = new TableBucket(tableId, 0); + FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(tb); + + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateWay = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + + TabletServerMetadataCache metadataCache = + FLUSS_CLUSTER_EXTENSION.getTabletServerById(leader).getMetadataCache(); + assertThat(metadataCache.getBucketMetadataForTable(tableId).size()).isEqualTo(3); + + // add bucket meta + List bucketMetadataList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + BucketMetadata bucketMetadata = + new BucketMetadata(i, i, i, Arrays.asList(i, i + 1, i + 2)); + bucketMetadataList.add(bucketMetadata); + } + + // 1. test update metadata with PbTableMetadataV2 + TableMetadata tableMetadata = + new TableMetadata( + tableId, + 0, + System.currentTimeMillis(), + System.currentTimeMillis(), + DATA1_TABLE_PATH, + bucketMetadataList); + UpdateMetadataRequest updateMetadataRequest = + makeUpdateMetadataRequest( + null, + Collections.emptySet(), + Collections.singletonList(tableMetadata), + Collections.emptyList()); + leaderGateWay.updateMetadata(updateMetadataRequest).get(); + + // verify update + assertThat(metadataCache.getBucketMetadataForTable(tableId).values()) + .containsExactlyInAnyOrderElementsOf(bucketMetadataList); + + // clear the metadataCache + metadataCache.clearTableMetadata(); + assertThat(metadataCache.getBucketMetadataForTable(tableId)).isEmpty(); + + // 2. test update metadata with PbTableMetadata + TableInfo tableInfo = FLUSS_CLUSTER_EXTENSION.getTableInfo(DATA1_TABLE_PATH); + TableMetadata tableMetadataV1 = new TableMetadata(tableInfo, bucketMetadataList); + List pbTableMetadataList = new ArrayList<>(); + pbTableMetadataList.add(toPbTableMetadata(tableMetadataV1, tableInfo)); + + UpdateMetadataRequest updateMetadataRequestV1 = new UpdateMetadataRequest(); + updateMetadataRequestV1.addAllTableMetadatas(pbTableMetadataList); + leaderGateWay.updateMetadata(updateMetadataRequestV1).get(); + + TableMetadata cacheTableMetadata = metadataCache.getTableMetadata(DATA1_TABLE_PATH); + assertThat(cacheTableMetadata.getTableId()).isEqualTo(tableInfo.getTableId()); + assertThat(cacheTableMetadata.getSchemaId()).isEqualTo(tableInfo.getSchemaId()); + assertThat(cacheTableMetadata.getCreatedTime()).isEqualTo(tableInfo.getCreatedTime()); + assertThat(cacheTableMetadata.getModifiedTime()).isEqualTo(tableInfo.getModifiedTime()); + assertThat(cacheTableMetadata.getTablePath()).isEqualTo(tableInfo.getTablePath()); + assertThat(metadataCache.getBucketMetadataForTable(tableId).values()) + .containsExactlyElementsOf(bucketMetadataList); + } + private static void assertPutKvResponse(PutKvResponse putKvResponse) { assertThat(putKvResponse.getBucketsRespsCount()).isEqualTo(1); PbPutKvRespForBucket putKvRespForBucket = putKvResponse.getBucketsRespsList().get(0); @@ -950,4 +1026,36 @@ private NotifyLeaderAndIsrRequest makeNotifyLeaderAndIsrRequest( return ServerRpcMessageUtils.makeNotifyLeaderAndIsrRequest( 0, Collections.singletonList(reqForBucket)); } + + private static PbTableMetadata toPbTableMetadata( + TableMetadata tableMetadata, TableInfo tableInfo) { + PbTableMetadata pbTableMetadata = + new PbTableMetadata() + .setTableId(tableInfo.getTableId()) + .setSchemaId(tableInfo.getSchemaId()) + .setTableJson(tableInfo.toTableDescriptor().toJsonBytes()) + .setCreatedTime(tableInfo.getCreatedTime()) + .setModifiedTime(tableInfo.getModifiedTime()); + TablePath tablePath = tableInfo.getTablePath(); + pbTableMetadata + .setTablePath() + .setDatabaseName(tablePath.getDatabaseName()) + .setTableName(tablePath.getTableName()); + pbTableMetadata.addAllBucketMetadatas( + toPbBucketMetadata(tableMetadata.getBucketMetadataList())); + return pbTableMetadata; + } + + private static MetadataRequest makeMetadataRequest(Set tablePaths) { + MetadataRequest metadataRequest = new MetadataRequest(); + if (tablePaths != null) { + for (TablePath tablePath : tablePaths) { + metadataRequest + .addTablePath() + .setDatabaseName(tablePath.getDatabaseName()) + .setTableName(tablePath.getTableName()); + } + } + return metadataRequest; + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index 05d85edf5b..dcf3e2e8f8 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -26,6 +26,7 @@ import org.apache.fluss.fs.local.LocalFileSystem; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.metrics.registry.MetricRegistry; import org.apache.fluss.rpc.GatewayClientProxy; @@ -452,6 +453,10 @@ public List getTabletServerNodes(@Nullable String listenerName) { .collect(Collectors.toList()); } + public TableInfo getTableInfo(TablePath tablePath) { + return metadataManager.getTable(tablePath); + } + public ZooKeeperClient getZooKeeperClient() { return zooKeeperClient; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/TableMetadataAssert.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/TableMetadataAssert.java index 25154e851b..df70424499 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/TableMetadataAssert.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/TableMetadataAssert.java @@ -38,6 +38,9 @@ private TableMetadataAssert(TableMetadata actual) { } public TableMetadataAssert isEqualTo(TableMetadata expected) { + assertThat(expected.getTableId()).isEqualTo(actual.getTableId()); + assertThat(expected.getSchemaId()).isEqualTo(actual.getSchemaId()); + assertThat(expected.getTablePath()).isEqualTo(actual.getTablePath()); assertThat(expected.getTableInfo()).isEqualTo(actual.getTableInfo()); List bucketMetadataList = expected.getBucketMetadataList(); List actualBucketMetadataList = actual.getBucketMetadataList();