diff --git a/paimon-common/src/main/java/org/apache/paimon/options/Options.java b/paimon-common/src/main/java/org/apache/paimon/options/Options.java index d292fef3bf14..46e70e22af7e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/Options.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/Options.java @@ -149,8 +149,8 @@ public synchronized Options removePrefix(String prefix) { return new Options(convertToPropertiesPrefixKey(data, prefix)); } - public synchronized void remove(String key) { - data.remove(key); + public synchronized String remove(String key) { + return data.remove(key); } public synchronized void remove(ConfigOption option) { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index dedfd5f11276..f766df0c5caf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -19,7 +19,6 @@ package org.apache.paimon.catalog; import org.apache.paimon.CoreOptions; -import org.apache.paimon.TableType; import org.apache.paimon.factories.FactoryUtil; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; @@ -32,18 +31,13 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.object.ObjectTable; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.system.SystemTableLoader; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.Preconditions; - -import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; @@ -55,13 +49,12 @@ import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.OBJECT_LOCATION; +import static org.apache.paimon.CoreOptions.PATH; import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.CoreOptions.createCommitUser; -import static org.apache.paimon.catalog.CatalogUtils.buildFormatTableByTableSchema; import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; -import static org.apache.paimon.catalog.CatalogUtils.getTableType; import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem; import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose; @@ -69,7 +62,6 @@ import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.Preconditions.checkArgument; -import static org.apache.paimon.utils.Preconditions.checkNotNull; /** Common implementation of {@link Catalog}. */ public abstract class AbstractCatalog implements Catalog { @@ -100,6 +92,11 @@ public FileIO fileIO() { return fileIO; } + @Override + public FileIO fileIO(Path path) { + return fileIO; + } + public Optional lockFactory() { if (!lockEnabled()) { return Optional.empty(); @@ -370,67 +367,9 @@ protected abstract void alterTableImpl(Identifier identifier, List @Override public Table getTable(Identifier identifier) throws TableNotExistException { - if (isSystemDatabase(identifier.getDatabaseName())) { - return CatalogUtils.createGlobalSystemTable(identifier.getTableName(), this); - } else if (identifier.isSystemTable()) { - Table originTable = - getDataOrFormatTable( - new Identifier( - identifier.getDatabaseName(), - identifier.getTableName(), - identifier.getBranchName(), - null)); - return CatalogUtils.createSystemTable(identifier, originTable); - } else { - return getDataOrFormatTable(identifier); - } - } - - // hive override this method. - protected Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException { - Preconditions.checkArgument(identifier.getSystemTableName() == null); - TableMeta tableMeta = getDataTableMeta(identifier); - TableType tableType = getTableType(tableMeta.schema().options()); - if (tableType == TableType.FORMAT_TABLE) { - TableSchema schema = tableMeta.schema(); - return buildFormatTableByTableSchema( - identifier, - schema.options(), - schema.logicalRowType(), - schema.partitionKeys(), - schema.comment()); - } - FileStoreTable table = - FileStoreTableFactory.create( - fileIO, - getTableLocation(identifier), - tableMeta.schema, - new CatalogEnvironment( - identifier, - tableMeta.uuid, - Lock.factory( - lockFactory().orElse(null), - lockContext().orElse(null), - identifier), - catalogLoader())); - if (tableType == TableType.OBJECT_TABLE) { - String objectLocation = table.coreOptions().objectLocation(); - checkNotNull(objectLocation, "Object location should not be null for object table."); - table = - ObjectTable.builder() - .underlyingTable(table) - .objectLocation(objectLocation) - .objectFileIO(objectFileIO(objectLocation)) - .build(); - } - return table; - } - - /** - * Catalog implementation may override this method to provide {@link FileIO} to object table. - */ - protected FileIO objectFileIO(String objectLocation) { - return fileIO; + Lock.Factory lockFactory = + Lock.factory(lockFactory().orElse(null), lockContext().orElse(null), identifier); + return CatalogUtils.loadTable(this, identifier, this::loadTableMetadata, lockFactory); } /** @@ -455,11 +394,11 @@ public Path newDatabasePath(String database) { return newDatabasePath(warehouse(), database); } - protected TableMeta getDataTableMeta(Identifier identifier) throws TableNotExistException { - return new TableMeta(getDataTableSchema(identifier), null); + protected TableMetadata loadTableMetadata(Identifier identifier) throws TableNotExistException { + return new TableMetadata(loadTableSchema(identifier), null); } - protected abstract TableSchema getDataTableSchema(Identifier identifier) + protected abstract TableSchema loadTableSchema(Identifier identifier) throws TableNotExistException; public Path getTableLocation(Identifier identifier) { @@ -537,38 +476,17 @@ protected boolean tableExistsInFileSystem(Path tablePath, String branchName) { } public Optional tableSchemaInFileSystem(Path tablePath, String branchName) { - return new SchemaManager(fileIO, tablePath, branchName) - .latest() - .map( - s -> { - if (!DEFAULT_MAIN_BRANCH.equals(branchName)) { + Optional schema = new SchemaManager(fileIO, tablePath, branchName).latest(); + if (!DEFAULT_MAIN_BRANCH.equals(branchName)) { + schema = + schema.map( + s -> { Options branchOptions = new Options(s.options()); branchOptions.set(CoreOptions.BRANCH, branchName); return s.copy(branchOptions.toMap()); - } else { - return s; - } - }); - } - - /** Table metadata. */ - protected static class TableMeta { - - private final TableSchema schema; - @Nullable private final String uuid; - - public TableMeta(TableSchema schema, @Nullable String uuid) { - this.schema = schema; - this.uuid = uuid; - } - - public TableSchema schema() { - return schema; - } - - @Nullable - public String uuid() { - return uuid; + }); } + schema.ifPresent(s -> s.options().put(PATH.key(), tablePath.toString())); + return schema; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index d0ad86c224d7..a0d78b268880 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -20,6 +20,7 @@ import org.apache.paimon.annotation.Public; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -374,6 +375,9 @@ default void repairTable(Identifier identifier) throws TableNotExistException { /** {@link FileIO} of this catalog. It can access {@link #warehouse()} path. */ FileIO fileIO(); + /** {@link FileIO} of this catalog. */ + FileIO fileIO(Path path); + /** Catalog options for re-creating this catalog. */ Map options(); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java index 3c245040ccd6..3ceef461e873 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java @@ -20,18 +20,23 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.TableType; +import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.operation.Lock; import org.apache.paimon.options.Options; import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.Table; +import org.apache.paimon.table.object.ObjectTable; import org.apache.paimon.table.system.AllTableOptionsTable; import org.apache.paimon.table.system.CatalogOptionsTable; import org.apache.paimon.table.system.SystemTableLoader; -import org.apache.paimon.types.RowType; import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.Preconditions; @@ -42,6 +47,7 @@ import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME; import static org.apache.paimon.CoreOptions.PARTITION_GENERATE_LEGCY_NAME; +import static org.apache.paimon.CoreOptions.PATH; import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX; import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; @@ -129,7 +135,76 @@ public static void validateAutoCreateClose(Map options) { CoreOptions.AUTO_CREATE.key(), Boolean.FALSE)); } - public static Table createGlobalSystemTable(String tableName, Catalog catalog) + public static List listPartitionsFromFileSystem(Table table) { + Options options = Options.fromMap(table.options()); + InternalRowPartitionComputer computer = + new InternalRowPartitionComputer( + options.get(PARTITION_DEFAULT_NAME), + table.rowType().project(table.partitionKeys()), + table.partitionKeys().toArray(new String[0]), + options.get(PARTITION_GENERATE_LEGCY_NAME)); + List partitionEntries = + table.newReadBuilder().newScan().listPartitionEntries(); + List partitions = new ArrayList<>(partitionEntries.size()); + for (PartitionEntry entry : partitionEntries) { + partitions.add( + new Partition( + computer.generatePartValues(entry.partition()), + entry.recordCount(), + entry.fileSizeInBytes(), + entry.fileCount(), + entry.lastFileCreationTime())); + } + return partitions; + } + + /** + * Load table from {@link Catalog}, this table can be: + * + *
    + *
  • 1. Global System table: contains the statistical information of all the tables exists. + *
  • 2. Format table: refers to a directory that contains multiple files of the same format. + *
  • 3. Data table: Normal {@link FileStoreTable}, primary key table or append table. + *
  • 4. Object table: provides metadata indexes for unstructured data in the location. + *
  • 5. System table: wraps Data table or Object table, such as the snapshots created. + *
+ */ + public static Table loadTable( + Catalog catalog, + Identifier identifier, + TableMetadata.Loader metadataLoader, + Lock.Factory lockFactory) + throws Catalog.TableNotExistException { + if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) { + return CatalogUtils.createGlobalSystemTable(identifier.getTableName(), catalog); + } + + TableMetadata metadata = metadataLoader.load(identifier); + TableSchema schema = metadata.schema(); + CoreOptions options = CoreOptions.fromMap(schema.options()); + if (options.type() == TableType.FORMAT_TABLE) { + return toFormatTable(identifier, schema); + } + + CatalogEnvironment catalogEnv = + new CatalogEnvironment( + identifier, metadata.uuid(), lockFactory, catalog.catalogLoader()); + Path path = new Path(schema.options().get(PATH.key())); + FileStoreTable table = + FileStoreTableFactory.create(catalog.fileIO(), path, schema, catalogEnv); + + if (options.type() == TableType.OBJECT_TABLE) { + table = toObjectTable(catalog, table); + } + + if (identifier.isSystemTable()) { + return CatalogUtils.createSystemTable(identifier, table); + } + + return table; + } + + private static Table createGlobalSystemTable(String tableName, Catalog catalog) throws Catalog.TableNotExistException { switch (tableName.toLowerCase()) { case ALL_TABLE_OPTIONS: @@ -154,7 +229,7 @@ public static Table createGlobalSystemTable(String tableName, Catalog catalog) } } - public static Table createSystemTable(Identifier identifier, Table originTable) + private static Table createSystemTable(Identifier identifier, Table originTable) throws Catalog.TableNotExistException { if (!(originTable instanceof FileStoreTable)) { throw new UnsupportedOperationException( @@ -172,41 +247,8 @@ public static Table createSystemTable(Identifier identifier, Table originTable) return table; } - public static List listPartitionsFromFileSystem(Table table) { - Options options = Options.fromMap(table.options()); - InternalRowPartitionComputer computer = - new InternalRowPartitionComputer( - options.get(PARTITION_DEFAULT_NAME), - table.rowType().project(table.partitionKeys()), - table.partitionKeys().toArray(new String[0]), - options.get(PARTITION_GENERATE_LEGCY_NAME)); - List partitionEntries = - table.newReadBuilder().newScan().listPartitionEntries(); - List partitions = new ArrayList<>(partitionEntries.size()); - for (PartitionEntry entry : partitionEntries) { - partitions.add( - new Partition( - computer.generatePartValues(entry.partition()), - entry.recordCount(), - entry.fileSizeInBytes(), - entry.fileCount(), - entry.lastFileCreationTime())); - } - return partitions; - } - - public static TableType getTableType(Map options) { - return options.containsKey(CoreOptions.TYPE.key()) - ? TableType.fromString(options.get(CoreOptions.TYPE.key())) - : CoreOptions.TYPE.defaultValue(); - } - - public static FormatTable buildFormatTableByTableSchema( - Identifier identifier, - Map options, - RowType rowType, - List partitionKeys, - String comment) { + private static FormatTable toFormatTable(Identifier identifier, TableSchema schema) { + Map options = schema.options(); FormatTable.Format format = FormatTable.parseFormat( options.getOrDefault( @@ -215,12 +257,23 @@ public static FormatTable buildFormatTableByTableSchema( String location = options.get(CoreOptions.PATH.key()); return FormatTable.builder() .identifier(identifier) - .rowType(rowType) - .partitionKeys(partitionKeys) + .rowType(schema.logicalRowType()) + .partitionKeys(schema.partitionKeys()) .location(location) .format(format) .options(options) - .comment(comment) + .comment(schema.comment()) + .build(); + } + + private static ObjectTable toObjectTable(Catalog catalog, FileStoreTable underlyingTable) { + CoreOptions options = underlyingTable.coreOptions(); + String objectLocation = options.objectLocation(); + FileIO objectFileIO = catalog.fileIO(new Path(objectLocation)); + return ObjectTable.builder() + .underlyingTable(underlyingTable) + .objectLocation(objectLocation) + .objectFileIO(objectFileIO) .build(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index aa7852456e5a..847485a7a16b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -19,6 +19,7 @@ package org.apache.paimon.catalog; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; import org.apache.paimon.partition.Partition; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -61,6 +62,11 @@ public FileIO fileIO() { return wrapped.fileIO(); } + @Override + public FileIO fileIO(Path path) { + return wrapped.fileIO(path); + } + @Override public List listDatabases() { return wrapped.listDatabases(); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 254826b91da4..f8e9a2aabc4d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -103,7 +103,7 @@ protected List listTablesImpl(String databaseName) { } @Override - public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { + public TableSchema loadTableSchema(Identifier identifier) throws TableNotExistException { return tableSchemaInFileSystem( getTableLocation(identifier), identifier.getBranchNameOrDefault()) .orElseThrow(() -> new TableNotExistException(identifier)); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/TableMetadata.java b/paimon-core/src/main/java/org/apache/paimon/catalog/TableMetadata.java new file mode 100644 index 000000000000..81904476a268 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/TableMetadata.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.catalog; + +import org.apache.paimon.schema.TableSchema; + +import javax.annotation.Nullable; + +/** Metadata for table. */ +public class TableMetadata { + + private final TableSchema schema; + @Nullable private final String uuid; + + public TableMetadata(TableSchema schema, @Nullable String uuid) { + this.schema = schema; + this.uuid = uuid; + } + + public TableSchema schema() { + return schema; + } + + @Nullable + public String uuid() { + return uuid; + } + + /** Loader to load {@link TableMetadata}. */ + public interface Loader { + TableMetadata load(Identifier identifier) throws Catalog.TableNotExistException; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 327c6b9676c8..8db9e723c295 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -357,15 +357,14 @@ protected void alterTableImpl(Identifier identifier, List changes) } @Override - protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { + protected TableSchema loadTableSchema(Identifier identifier) throws TableNotExistException { assertMainBranch(identifier); if (!JdbcUtils.tableExists( connections, catalogKey, identifier.getDatabaseName(), identifier.getTableName())) { throw new TableNotExistException(identifier); } Path tableLocation = getTableLocation(identifier); - return new SchemaManager(fileIO, tableLocation) - .latest() + return tableSchemaInFileSystem(tableLocation, identifier.getBranchNameOrDefault()) .orElseThrow( () -> new RuntimeException("There is no paimon table in " + tableLocation)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 4b4e90fb7c20..97f9ffe56794 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -18,7 +18,6 @@ package org.apache.paimon.rest; -import org.apache.paimon.TableType; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogLoader; @@ -26,6 +25,7 @@ import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; +import org.apache.paimon.catalog.TableMetadata; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.operation.FileStoreCommit; @@ -60,14 +60,10 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.Table; -import org.apache.paimon.table.object.ObjectTable; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.Preconditions; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; @@ -83,20 +79,16 @@ import java.util.concurrent.ScheduledExecutorService; import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE; -import static org.apache.paimon.CoreOptions.PATH; import static org.apache.paimon.CoreOptions.createCommitUser; -import static org.apache.paimon.catalog.CatalogUtils.buildFormatTableByTableSchema; import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; -import static org.apache.paimon.catalog.CatalogUtils.getTableType; import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem; import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose; import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; import static org.apache.paimon.rest.RESTUtil.extractPrefixMap; import static org.apache.paimon.rest.auth.AuthSession.createAuthSession; -import static org.apache.paimon.utils.Preconditions.checkNotNull; import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool; /** A catalog implementation for REST. */ @@ -171,6 +163,11 @@ public FileIO fileIO() { return fileIO; } + @Override + public FileIO fileIO(Path path) { + return fileIO; + } + @Override public List listDatabases() { ListDatabasesResponse response = @@ -281,13 +278,28 @@ public List listTables(String databaseName) throws DatabaseNotExistExcep @Override public Table getTable(Identifier identifier) throws TableNotExistException { - if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) { - return CatalogUtils.createGlobalSystemTable(identifier.getTableName(), this); - } else if (identifier.isSystemTable()) { - return getSystemTable(identifier); - } else { - return getDataOrFormatTable(identifier); + // TODO add lock from server + return CatalogUtils.loadTable( + this, identifier, this::loadTableMetadata, Lock.emptyFactory()); + } + + private TableMetadata loadTableMetadata(Identifier identifier) throws TableNotExistException { + GetTableResponse response; + try { + response = + client.get( + resourcePaths.table( + identifier.getDatabaseName(), identifier.getTableName()), + GetTableResponse.class, + headers()); + } catch (NoSuchResourceException e) { + throw new TableNotExistException(identifier); + } catch (ForbiddenException e) { + throw new TableNoPermissionException(identifier, e); } + + TableSchema schema = TableSchema.create(response.getSchemaId(), response.getSchema()); + return new TableMetadata(schema, response.getId()); } @Override @@ -520,56 +532,6 @@ public void close() throws Exception { } } - private Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException { - Preconditions.checkArgument(identifier.getSystemTableName() == null); - - GetTableResponse response; - try { - response = - client.get( - resourcePaths.table( - identifier.getDatabaseName(), identifier.getTableName()), - GetTableResponse.class, - headers()); - } catch (NoSuchResourceException e) { - throw new TableNotExistException(identifier); - } catch (ForbiddenException e) { - throw new TableNoPermissionException(identifier, e); - } - TableType tableType = getTableType(response.getSchema().options()); - if (tableType == TableType.FORMAT_TABLE) { - Schema schema = response.getSchema(); - return buildFormatTableByTableSchema( - identifier, - schema.options(), - schema.rowType(), - schema.partitionKeys(), - schema.comment()); - } - TableSchema schema = TableSchema.create(response.getSchemaId(), response.getSchema()); - FileStoreTable table = - FileStoreTableFactory.create( - fileIO(), - new Path(schema.options().get(PATH.key())), - schema, - new CatalogEnvironment( - identifier, - response.getId(), - Lock.emptyFactory(), - catalogLoader())); - if (tableType == TableType.OBJECT_TABLE) { - String objectLocation = table.coreOptions().objectLocation(); - checkNotNull(objectLocation, "Object location should not be null for object table."); - table = - ObjectTable.builder() - .underlyingTable(table) - .objectLocation(objectLocation) - .objectFileIO(this.fileIO()) - .build(); - } - return table; - } - private boolean isMetaStorePartitionedTable(Table table) { Options options = Options.fromMap(table.options()); return Boolean.TRUE.equals(options.get(METASTORE_PARTITIONED_TABLE)); @@ -579,17 +541,6 @@ private Map headers() { return catalogAuth.getHeaders(); } - private Table getSystemTable(Identifier identifier) throws TableNotExistException { - Table originTable = - getDataOrFormatTable( - new Identifier( - identifier.getDatabaseName(), - identifier.getTableName(), - identifier.getBranchName(), - null)); - return CatalogUtils.createSystemTable(identifier, originTable); - } - private ScheduledExecutorService tokenRefreshExecutor() { if (refreshExecutor == null) { synchronized (this) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java index 97acfe7299c5..992425ed5239 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java @@ -34,6 +34,7 @@ import java.util.Map; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkNotNull; /** * A object table refers to a directory that contains multiple objects (files), Object table @@ -122,6 +123,7 @@ public ObjectTable.Builder objectLocation(String objectLocation) { } public ObjectTable build() { + checkNotNull(objectLocation, "Object location should not be null for object table."); return new ObjectTableImpl(underlyingTable, objectFileIO, objectLocation); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index c8b9192c1c38..1c453a1b3bcf 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -409,7 +409,7 @@ public void testGetTable() throws Exception { catalog.getTable( Identifier.create( "test_db", "non_existing_table$snapshots"))) - .withMessage("Table test_db.non_existing_table does not exist."); + .withMessage("Table test_db.non_existing_table$snapshots does not exist."); // Get system table throws TableNotExistException when system table type does not exist assertThatExceptionOfType(Catalog.TableNotExistException.class) @@ -417,7 +417,7 @@ public void testGetTable() throws Exception { () -> catalog.getTable( Identifier.create("test_db", "non_existing_table$schema1"))) - .withMessage("Table test_db.non_existing_table does not exist."); + .withMessage("Table test_db.non_existing_table$schema1 does not exist."); // Get data table throws TableNotExistException when table does not exist assertThatExceptionOfType(Catalog.TableNotExistException.class) diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java b/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java index a0f820e7ad0b..4f150493110b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java @@ -18,12 +18,13 @@ package org.apache.paimon.rest; +import org.apache.paimon.CoreOptions; import org.apache.paimon.TableType; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.catalog.FileSystemCatalog; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.catalog.TableMetadata; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; @@ -166,7 +167,8 @@ protected void alterTableImpl(Identifier identifier, List changes) throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { if (tableFullName2Schema.containsKey(identifier.getFullName())) { TableSchema schema = tableFullName2Schema.get(identifier.getFullName()); - if (CatalogUtils.getTableType(schema.options()) == TableType.FORMAT_TABLE) { + Options options = Options.fromMap(schema.options()); + if (options.get(CoreOptions.TYPE) == TableType.FORMAT_TABLE) { throw new UnsupportedOperationException("Only data table support alter table."); } } else { @@ -189,12 +191,12 @@ public void createFormatTable(Identifier identifier, Schema schema) { } @Override - protected TableMeta getDataTableMeta(Identifier identifier) throws TableNotExistException { + protected TableMetadata loadTableMetadata(Identifier identifier) throws TableNotExistException { if (tableFullName2Schema.containsKey(identifier.getFullName())) { TableSchema tableSchema = tableFullName2Schema.get(identifier.getFullName()); - return new TableMeta(tableSchema, "uuid"); + return new TableMetadata(tableSchema, "uuid"); } - return super.getDataTableMeta(identifier); + return super.loadTableMetadata(identifier); } private Partition spec2Partition(Map spec) { diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 4e2d7b6acb95..438c6971aa8e 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -28,6 +28,7 @@ import org.apache.paimon.catalog.CatalogLockFactory; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; +import org.apache.paimon.catalog.TableMetadata; import org.apache.paimon.client.ClientPool; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -40,10 +41,8 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.CatalogTableType; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.FormatTable; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -51,7 +50,6 @@ import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.PartitionPathUtils; -import org.apache.paimon.utils.Preconditions; import org.apache.paimon.view.View; import org.apache.paimon.view.ViewImpl; @@ -115,7 +113,7 @@ import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR; import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER; import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES; -import static org.apache.paimon.hive.HiveTableUtils.convertToFormatTable; +import static org.apache.paimon.hive.HiveTableUtils.tryToFormatSchema; import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; import static org.apache.paimon.options.CatalogOptions.FORMAT_TABLE_ENABLED; import static org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES; @@ -354,7 +352,7 @@ public void createPartitions(Identifier identifier, List> pa Identifier.create(identifier.getDatabaseName(), identifier.getTableName()); Table hmsTable = getHmsTable(tableIdentifier); Path location = getTableLocation(tableIdentifier, hmsTable); - TableSchema schema = getDataTableSchema(tableIdentifier, hmsTable); + TableSchema schema = loadTableSchema(tableIdentifier, hmsTable); if (!metastorePartitioned(schema)) { return; @@ -395,7 +393,7 @@ public void createPartitions(Identifier identifier, List> pa @Override public void dropPartitions(Identifier identifier, List> partitions) throws TableNotExistException { - TableSchema schema = getDataTableSchema(identifier); + TableSchema schema = this.loadTableSchema(identifier); CoreOptions options = CoreOptions.fromMap(schema.options()); boolean tagToPart = options.tagToPartitionField() != null; if (metastorePartitioned(schema)) { @@ -429,7 +427,7 @@ public void dropPartitions(Identifier identifier, List> part public void alterPartitions( Identifier identifier, List partitions) throws TableNotExistException { - TableSchema tableSchema = getDataTableSchema(identifier); + TableSchema tableSchema = this.loadTableSchema(identifier); if (!tableSchema.partitionKeys().isEmpty() && new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) { for (org.apache.paimon.partition.Partition partition : partitions) { @@ -674,32 +672,41 @@ protected List listTablesImpl(String databaseName) { } @Override - protected TableMeta getDataTableMeta(Identifier identifier) throws TableNotExistException { - return getDataTableMeta(identifier, getHmsTable(identifier)); + protected TableMetadata loadTableMetadata(Identifier identifier) throws TableNotExistException { + return loadTableMetadata(identifier, getHmsTable(identifier)); } - private TableMeta getDataTableMeta(Identifier identifier, Table table) + private TableMetadata loadTableMetadata(Identifier identifier, Table table) throws TableNotExistException { - return new TableMeta( - getDataTableSchema(identifier, table), + return new TableMetadata( + loadTableSchema(identifier, table), identifier.getFullName() + "." + table.getCreateTime()); } @Override - public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException { + public TableSchema loadTableSchema(Identifier identifier) throws TableNotExistException { Table table = getHmsTable(identifier); - return getDataTableSchema(identifier, table); + return loadTableSchema(identifier, table); } - private TableSchema getDataTableSchema(Identifier identifier, Table table) + private TableSchema loadTableSchema(Identifier identifier, Table table) throws TableNotExistException { - if (!isPaimonTable(table)) { - throw new TableNotExistException(identifier); + if (isPaimonTable(table)) { + return tableSchemaInFileSystem( + getTableLocation(identifier, table), + identifier.getBranchNameOrDefault()) + .orElseThrow(() -> new TableNotExistException(identifier)); + } + + if (!formatTableDisabled()) { + try { + Schema schema = tryToFormatSchema(table); + return TableSchema.create(0, schema); + } catch (UnsupportedOperationException ignored) { + } } - return tableSchemaInFileSystem( - getTableLocation(identifier, table), identifier.getBranchNameOrDefault()) - .orElseThrow(() -> new TableNotExistException(identifier)); + throw new TableNotExistException(identifier); } @Override @@ -835,39 +842,6 @@ public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfN renameHiveTable(fromView, toView); } - @Override - public org.apache.paimon.table.Table getDataOrFormatTable(Identifier identifier) - throws TableNotExistException { - Preconditions.checkArgument(identifier.getSystemTableName() == null); - Table table = getHmsTable(identifier); - try { - TableMeta tableMeta = getDataTableMeta(identifier, table); - return FileStoreTableFactory.create( - fileIO, - getTableLocation(identifier, table), - tableMeta.schema(), - new CatalogEnvironment( - identifier, - tableMeta.uuid(), - Lock.factory( - lockFactory().orElse(null), - lockContext().orElse(null), - identifier), - catalogLoader())); - } catch (TableNotExistException ignore) { - } - - if (formatTableDisabled()) { - throw new TableNotExistException(identifier); - } - - try { - return convertToFormatTable(table); - } catch (UnsupportedOperationException e) { - throw new TableNotExistException(identifier); - } - } - @Override public void createFormatTable(Identifier identifier, Schema schema) { if (formatTableDisabled()) { @@ -1278,7 +1252,7 @@ private static boolean isPaimonTable(Table table) { private boolean isFormatTable(Table table) { try { - convertToFormatTable(table); + tryToFormatSchema(table); return true; } catch (UnsupportedOperationException e) { return false; diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java index 5e5af75e52b9..0007247bff21 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java @@ -18,8 +18,8 @@ package org.apache.paimon.hive; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.table.FormatTable; +import org.apache.paimon.options.Options; +import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FormatTable.Format; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowType; @@ -31,24 +31,25 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM; +import static org.apache.paimon.CoreOptions.FILE_FORMAT; +import static org.apache.paimon.CoreOptions.PATH; +import static org.apache.paimon.CoreOptions.TYPE; +import static org.apache.paimon.TableType.FORMAT_TABLE; import static org.apache.paimon.catalog.Catalog.COMMENT_PROP; import static org.apache.paimon.hive.HiveCatalog.isView; import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER; class HiveTableUtils { - public static FormatTable convertToFormatTable(Table hiveTable) { + public static Schema tryToFormatSchema(Table hiveTable) { if (isView(hiveTable)) { throw new UnsupportedOperationException("Hive view is not supported."); } - Identifier identifier = new Identifier(hiveTable.getDbName(), hiveTable.getTableName()); - Map options = new HashMap<>(hiveTable.getParameters()); + Options options = Options.fromMap(hiveTable.getParameters()); List partitionKeys = getFieldNames(hiveTable.getPartitionKeys()); RowType rowType = createRowType(hiveTable); String comment = options.remove(COMMENT_PROP); @@ -65,20 +66,19 @@ public static FormatTable convertToFormatTable(Table hiveTable) { } else if (inputFormat.contains("Text")) { format = Format.CSV; // hive default field delimiter is '\u0001' - options.put( - FIELD_DELIMITER.key(), - serdeInfo.getParameters().getOrDefault(FIELD_DELIM, "\u0001")); + options.set( + FIELD_DELIMITER, serdeInfo.getParameters().getOrDefault(FIELD_DELIM, "\u0001")); } else { throw new UnsupportedOperationException("Unsupported table: " + hiveTable); } - return FormatTable.builder() - .identifier(identifier) - .rowType(rowType) - .partitionKeys(partitionKeys) - .location(location) - .format(format) - .options(options) + Schema.Builder builder = Schema.newBuilder(); + rowType.getFields().forEach(f -> builder.column(f.name(), f.type(), f.description())); + options.set(PATH, location); + options.set(TYPE, FORMAT_TABLE); + options.set(FILE_FORMAT, format.name().toLowerCase()); + return builder.partitionKeys(partitionKeys) + .options(options.toMap()) .comment(comment) .build(); } diff --git a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java index ea669d254fbd..ff2bd04d1f48 100644 --- a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java +++ b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java @@ -305,8 +305,6 @@ public void testListTablesLock() { Thread thread1 = new Thread( () -> { - System.out.println( - "First thread started at " + System.currentTimeMillis()); try { tables1.addAll(catalog.listTables(databaseName)); } catch (Catalog.DatabaseNotExistException e) { @@ -316,8 +314,6 @@ public void testListTablesLock() { Thread thread2 = new Thread( () -> { - System.out.println( - "Second thread started at " + System.currentTimeMillis()); try { tables2.addAll(catalog.listTables(databaseName)); } catch (Catalog.DatabaseNotExistException e) {