diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 0d89becbb286ce..4eb267e524fd65 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2216,6 +2216,21 @@ public class Config extends ConfigBase { "Sample size for hive row count estimation."}) public static int hive_stats_partition_sample_size = 3000; + @ConfField(mutable = true, masterOnly = true, description = { + "Hive创建外部表默认指定的input format", + "Default hive input format for creating table."}) + public static String hive_default_input_format = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; + + @ConfField(mutable = true, masterOnly = true, description = { + "Hive创建外部表默认指定的output format", + "Default hive output format for creating table."}) + public static String hive_default_output_format = "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"; + + @ConfField(mutable = true, masterOnly = true, description = { + "Hive创建外部表默认指定的SerDe类", + "Default hive serde class for creating table."}) + public static String hive_default_serde = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; + @ConfField public static int statistics_sql_parallel_exec_instance_num = 1; diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index d406eb795ec3d2..6375988529405a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -415,8 +415,8 @@ private void processModifyEngineInternal(Database db, Table externalTable, } odbcTable.writeLock(); try { - db.dropTable(mysqlTable.getName()); - db.createTable(odbcTable); + db.unregisterTable(mysqlTable.getName()); + db.registerTable(odbcTable); if (!isReplay) { ModifyTableEngineOperationLog log = new ModifyTableEngineOperationLog(db.getId(), externalTable.getId(), prop); @@ -595,17 +595,17 @@ private void replaceTableInternal(Database db, OlapTable origTable, OlapTable ne String newTblName = newTbl.getName(); // drop origin table and new table - db.dropTable(oldTblName); - db.dropTable(newTblName); + db.unregisterTable(oldTblName); + db.unregisterTable(newTblName); // rename new table name to origin table name and add it to database newTbl.checkAndSetName(oldTblName, false); - db.createTable(newTbl); + db.registerTable(newTbl); if (swapTable) { // rename origin table name to new table name and add it to database origTable.checkAndSetName(newTblName, false); - db.createTable(origTable); + db.registerTable(origTable); } else { // not swap, the origin table is not used anymore, need to drop all its tablets. Env.getCurrentEnv().onEraseOlapTable(origTable, isReplay); @@ -637,8 +637,8 @@ private void modifyViewDef(Database db, View view, String inlineViewDef, long sq } view.setNewFullSchema(newFullSchema); String viewName = view.getName(); - db.dropTable(viewName); - db.createTable(view); + db.unregisterTable(viewName); + db.registerTable(view); AlterViewInfo alterViewInfo = new AlterViewInfo(db.getId(), view.getId(), inlineViewDef, newFullSchema, sqlMode); @@ -673,8 +673,8 @@ public void replayModifyViewDef(AlterViewInfo alterViewInfo) throws MetaNotFound } view.setNewFullSchema(newFullSchema); - db.dropTable(viewName); - db.createTable(view); + db.unregisterTable(viewName); + db.registerTable(view); LOG.info("replay modify view[{}] definition to {}", viewName, inlineViewDef); } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index cca3946f3d9979..6f43fab385cdb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -39,7 +39,6 @@ import org.apache.doris.common.util.ParseUtil; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.PropertyAnalyzer; -import org.apache.doris.common.util.Util; import org.apache.doris.datasource.es.EsUtil; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -277,6 +276,11 @@ public List getIndexes() { @Override public void analyze(Analyzer analyzer) throws UserException { + if (Config.isCloudMode() && properties != null + && properties.containsKey(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE)) { + // FIXME: MOW is not supported in cloud mode yet. + properties.put(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE, "false"); + } if (Strings.isNullOrEmpty(engineName) || engineName.equalsIgnoreCase(DEFAULT_ENGINE_NAME)) { this.properties = maybeRewriteByAutoBucket(distributionDesc, properties); } @@ -284,8 +288,6 @@ public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); tableName.analyze(analyzer); FeNameFormat.checkTableName(tableName.getTbl()); - // disallow external catalog - Util.prohibitExternalCatalog(tableName.getCtl(), this.getClass().getSimpleName()); InternalDatabaseUtil.checkDatabase(tableName.getDb(), ConnectContext.get()); if (!Env.getCurrentEnv().getAccessManager() .checkTblPriv(ConnectContext.get(), tableName.getDb(), tableName.getTbl(), PrivPredicate.CREATE)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropTableStmt.java index d2ff04186fb41a..70167d744daf5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropTableStmt.java @@ -22,7 +22,6 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; import org.apache.doris.common.util.InternalDatabaseUtil; -import org.apache.doris.common.util.Util; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -84,8 +83,6 @@ public void analyze(Analyzer analyzer) throws UserException { tableName.setDb(analyzer.getDefaultDb()); } tableName.analyze(analyzer); - // disallow external catalog - Util.prohibitExternalCatalog(tableName.getCtl(), this.getClass().getSimpleName()); InternalDatabaseUtil.checkDatabase(tableName.getDb(), ConnectContext.get()); // check access if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(ConnectContext.get(), tableName.getDb(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index ec4ca1439556a0..fcfea3b2b24c83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -904,7 +904,7 @@ private void checkAndPrepareMeta() { } tbl.writeLock(); try { - if (!db.createTable(tbl)) { + if (!db.registerTable(tbl)) { status = new Status(ErrCode.COMMON_ERROR, "Table " + tbl.getName() + " already exist in db: " + db.getFullName()); return; @@ -1285,7 +1285,7 @@ private void replayCheckAndPrepareMeta() { db.writeLock(); restoreTbl.writeLock(); try { - db.createTable(restoreTbl); + db.registerTable(restoreTbl); } finally { restoreTbl.writeUnlock(); db.writeUnlock(); @@ -1935,7 +1935,7 @@ public void cancelInternal(boolean isReplay) { } } } - db.dropTable(restoreTbl.getName()); + db.unregisterTable(restoreTbl.getName()); } finally { restoreTbl.writeUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java index 3ae630744540c5..51a0c5ba9323d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java @@ -644,7 +644,7 @@ private void recoverAllTables(RecycleDatabaseInfo dbInfo) throws DdlException { } Table table = tableInfo.getTable(); - db.createTable(table); + db.registerTable(table); LOG.info("recover db[{}] with table[{}]: {}", dbId, table.getId(), table.getName()); iterator.remove(); idToRecycleTime.remove(table.getId()); @@ -739,7 +739,7 @@ private synchronized boolean innerRecoverTable(Database db, Table table, String } } - db.createTable(table); + db.registerTable(table); if (isReplay) { iterator.remove(); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index c4475bc3c3eaf3..fd709963a7b073 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -416,25 +416,26 @@ public Pair createTableWithLock( } } - public boolean createTable(Table table) { + public boolean registerTable(TableIf table) { boolean result = true; - table.setQualifiedDbName(fullQualifiedName); - String tableName = table.getName(); + Table olapTable = (Table) table; + olapTable.setQualifiedDbName(fullQualifiedName); + String tableName = olapTable.getName(); if (Env.isStoredTableNamesLowerCase()) { tableName = tableName.toLowerCase(); } if (isTableExist(tableName)) { result = false; } else { - idToTable.put(table.getId(), table); - nameToTable.put(table.getName(), table); + idToTable.put(olapTable.getId(), olapTable); + nameToTable.put(olapTable.getName(), olapTable); lowerCaseToTableName.put(tableName.toLowerCase(), tableName); } - table.unmarkDropped(); + olapTable.unmarkDropped(); return result; } - public void dropTable(String tableName) { + public void unregisterTable(String tableName) { if (Env.isStoredTableNamesLowerCase()) { tableName = tableName.toLowerCase(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java index 454851620e2b16..6c0d46ffe98824 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseIf.java @@ -263,7 +263,18 @@ default OlapTable getOlapTableOrAnalysisException(String tableName) throws Analy return (OlapTable) table; } - void dropTable(String tableName); + /** + * register table to memory + * @param table created table + * @return true if add to memory + */ + boolean registerTable(TableIf table); + + /** + * unregister table from memory + * @param tableName table name + */ + void unregisterTable(String tableName); CatalogIf getCatalog(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 76ff70589b9e24..bbcb14207d40e5 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -2954,7 +2954,7 @@ public Frontend getFeByName(String name) { // The interface which DdlExecutor needs. public void createDb(CreateDbStmt stmt) throws DdlException { - getInternalCatalog().createDb(stmt); + getCurrentCatalog().createDb(stmt); } // For replay edit log, need't lock metadata @@ -2967,7 +2967,7 @@ public void replayCreateDb(Database db) { } public void dropDb(DropDbStmt stmt) throws DdlException { - getInternalCatalog().dropDb(stmt); + getCurrentCatalog().dropDb(stmt); } public void replayDropDb(String dbName, boolean isForceDrop, Long recycleTime) throws DdlException { @@ -3039,7 +3039,7 @@ public void replayRenameDatabase(String dbName, String newDbName) { * 11. add this table to ColocateGroup if necessary */ public void createTable(CreateTableStmt stmt) throws UserException { - getInternalCatalog().createTable(stmt); + getCurrentCatalog().createTable(stmt); } public void createTableLike(CreateTableLikeStmt stmt) throws DdlException { @@ -3659,7 +3659,7 @@ public void replayAlterExternalTableSchema(String dbName, String tableName, List // Drop table public void dropTable(DropTableStmt stmt) throws DdlException { - getInternalCatalog().dropTable(stmt); + getCurrentCatalog().dropTable(stmt); } public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay, @@ -4308,8 +4308,8 @@ public void renameTable(Database db, Table table, String newTableName) throws Dd table.setName(newTableName); } - db.dropTable(oldTableName); - db.createTable(table); + db.unregisterTable(oldTableName); + db.registerTable(table); TableInfo tableInfo = TableInfo.createForTableRename(db.getId(), table.getId(), newTableName); editLog.logTableRename(tableInfo); @@ -4341,9 +4341,9 @@ public void replayRenameTable(TableInfo tableInfo) throws MetaNotFoundException table.writeLock(); try { String tableName = table.getName(); - db.dropTable(tableName); + db.unregisterTable(tableName); table.setName(newTableName); - db.createTable(table); + db.registerTable(table); LOG.info("replay rename table[{}] to {}", tableName, newTableName); } finally { table.writeUnlock(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InfoSchemaDb.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InfoSchemaDb.java index b72850b3e9493a..2a612066c76a6d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InfoSchemaDb.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InfoSchemaDb.java @@ -29,12 +29,12 @@ public InfoSchemaDb() { @Override protected void initTables() { for (Table table : SchemaTable.TABLE_MAP.values()) { - super.createTable(table); + super.registerTable(table); } } @Override - public boolean createTable(Table table) { + public boolean registerTable(TableIf table) { return false; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlCompatibleDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlCompatibleDatabase.java index 12d915321ede50..028fe186b2c598 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlCompatibleDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlCompatibleDatabase.java @@ -50,12 +50,12 @@ public MysqlCompatibleDatabase(long id, String name) { * @note: Rename a table of mysql database in MYSQL ls allowed. */ @Override - public boolean createTable(Table table) { - return super.createTable(table); + public boolean registerTable(TableIf table) { + return super.registerTable(table); } @Override - public void dropTable(String name) { + public void unregisterTable(String name) { // Do nothing } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlDb.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlDb.java index 0e09995d94c0b7..67d7fc8ecca078 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlDb.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlDb.java @@ -52,7 +52,7 @@ public MysqlDb() { public void initTables() {} @Override - public boolean createTable(Table table) { + public boolean registerTable(TableIf table) { return false; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java index 7fd240aa8d07d2..ffae9420edeb48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java @@ -17,6 +17,10 @@ package org.apache.doris.datasource; +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.DropDbStmt; +import org.apache.doris.analysis.DropTableStmt; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; @@ -24,6 +28,7 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.UserException; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -177,4 +182,12 @@ default CatalogLog constructEditLog() { boolean enableAutoAnalyze(); ConcurrentHashMap getIdToDb(); + + void createDb(CreateDbStmt stmt) throws DdlException; + + void dropDb(DropDbStmt stmt) throws DdlException; + + void createTable(CreateTableStmt stmt) throws UserException; + + void dropTable(DropTableStmt stmt) throws DdlException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java index c9b394fcb7de45..41631dc82aa06d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java @@ -44,7 +44,6 @@ import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.hive.HMSExternalCatalog; -import org.apache.doris.datasource.hive.HMSExternalDatabase; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.OperationType; @@ -744,16 +743,16 @@ public void replayRefreshExternalTable(ExternalObjectLog log) { } } - public void dropExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfExists) + public void unregisterExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfExists) throws DdlException { - CatalogIf catalog = nameToCatalog.get(catalogName); + CatalogIf catalog = nameToCatalog.get(catalogName); if (catalog == null) { throw new DdlException("No catalog found with name: " + catalogName); } if (!(catalog instanceof ExternalCatalog)) { throw new DdlException("Only support drop ExternalCatalog Tables"); } - DatabaseIf db = catalog.getDbNullable(dbName); + ExternalDatabase db = ((ExternalCatalog) catalog).getDbNullable(dbName); if (db == null) { if (!ignoreIfExists) { throw new DdlException("Database " + dbName + " does not exist in catalog " + catalog.getName()); @@ -771,10 +770,7 @@ public void dropExternalTable(String dbName, String tableName, String catalogNam db.writeLock(); try { - db.dropTable(table.getName()); - Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache( - catalog.getId(), db.getFullName(), table.getName()); - ((HMSExternalDatabase) db).setLastUpdateTime(System.currentTimeMillis()); + db.unregisterTable(table.getName()); } finally { db.writeUnlock(); } @@ -791,9 +787,9 @@ public boolean externalTableExistInLocal(String dbName, String tableName, String return ((ExternalCatalog) catalog).tableExistInLocal(dbName, tableName); } - public void createExternalTableFromEvent(String dbName, String tableName, - String catalogName, long updateTime, - boolean ignoreIfExists) throws DdlException { + public void registerExternalTableFromEvent(String dbName, String tableName, + String catalogName, long updateTime, + boolean ignoreIfExists) throws DdlException { CatalogIf catalog = nameToCatalog.get(catalogName); if (catalog == null) { throw new DdlException("No catalog found with name: " + catalogName); @@ -816,7 +812,6 @@ public void createExternalTableFromEvent(String dbName, String tableName, } return; } - long tblId = Env.getCurrentEnv().getExternalMetaIdMgr().getTblId(catalog.getId(), dbName, tableName); // -1L means it will be dropped later, ignore if (tblId == ExternalMetaIdMgr.META_ID_FOR_NOT_EXISTS) { @@ -825,18 +820,16 @@ public void createExternalTableFromEvent(String dbName, String tableName, db.writeLock(); try { - ((HMSExternalDatabase) db).createTable(tableName, tblId); - ((HMSExternalDatabase) db).setLastUpdateTime(System.currentTimeMillis()); - table = db.getTableNullable(tableName); - if (table != null) { - ((HMSExternalTable) table).setEventUpdateTime(updateTime); - } + HMSExternalTable namedTable = new HMSExternalTable(tblId, tableName, dbName, (HMSExternalCatalog) catalog); + namedTable.setUpdateTime(updateTime); + db.registerTable(namedTable); } finally { db.writeUnlock(); } } - public void dropExternalDatabase(String dbName, String catalogName, boolean ignoreIfNotExists) throws DdlException { + public void unregisterExternalDatabase(String dbName, String catalogName, boolean ignoreIfNotExists) + throws DdlException { CatalogIf catalog = nameToCatalog.get(catalogName); if (catalog == null) { throw new DdlException("No catalog found with name: " + catalogName); @@ -851,12 +844,11 @@ public void dropExternalDatabase(String dbName, String catalogName, boolean igno } return; } - - ((HMSExternalCatalog) catalog).dropDatabase(dbName); - Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(catalog.getId(), dbName); + ((HMSExternalCatalog) catalog).unregisterDatabase(dbName); } - public void createExternalDatabase(String dbName, String catalogName, boolean ignoreIfExists) throws DdlException { + public void registerExternalDatabase(String dbName, String catalogName, boolean ignoreIfExists) + throws DdlException { CatalogIf catalog = nameToCatalog.get(catalogName); if (catalog == null) { throw new DdlException("No catalog found with name: " + catalogName); @@ -878,7 +870,7 @@ public void createExternalDatabase(String dbName, String catalogName, boolean ig return; } - ((HMSExternalCatalog) catalog).createDatabase(dbId, dbName); + ((HMSExternalCatalog) catalog).registerDatabase(dbId, dbName); } public void addExternalPartitions(String catalogName, String dbName, String tableName, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DatabaseMetadata.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DatabaseMetadata.java new file mode 100644 index 00000000000000..97905ce40ea3c8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DatabaseMetadata.java @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +public interface DatabaseMetadata { + String getDbName(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/DorisTypeVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/DorisTypeVisitor.java new file mode 100644 index 00000000000000..54a35acf595a59 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/DorisTypeVisitor.java @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.StructField; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Type; + +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * Utils to visit doris and iceberg type + * @param + */ +public class DorisTypeVisitor { + public static T visit(Type type, DorisTypeVisitor visitor) { + if (type instanceof StructType) { + List fields = ((StructType) type).getFields(); + List fieldResults = Lists.newArrayListWithExpectedSize(fields.size()); + + for (StructField field : fields) { + fieldResults.add(visitor.field( + field, + visit(field.getType(), visitor))); + } + + return visitor.struct((StructType) type, fieldResults); + } else if (type instanceof MapType) { + return visitor.map((MapType) type, + visit(((MapType) type).getKeyType(), visitor), + visit(((MapType) type).getValueType(), visitor)); + } else if (type instanceof ArrayType) { + return visitor.array( + (ArrayType) type, + visit(((ArrayType) type).getItemType(), visitor)); + } else { + return visitor.atomic(type); + } + } + + public T struct(StructType struct, List fieldResults) { + return null; + } + + public T field(StructField field, T typeResult) { + return null; + } + + public T array(ArrayType array, T elementResult) { + return null; + } + + public T map(MapType map, T keyResult, T valueResult) { + return null; + } + + public T atomic(Type atomic) { + return null; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 03ad99ce77f96f..0eabffce4457a3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -17,6 +17,10 @@ package org.apache.doris.datasource; +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.DropDbStmt; +import org.apache.doris.analysis.DropTableStmt; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; @@ -25,6 +29,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.Util; @@ -35,6 +40,7 @@ import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase; import org.apache.doris.datasource.jdbc.JdbcExternalDatabase; import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase; +import org.apache.doris.datasource.operations.ExternalMetadataOps; import org.apache.doris.datasource.paimon.PaimonExternalDatabase; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.test.TestExternalDatabase; @@ -100,6 +106,7 @@ public abstract class ExternalCatalog protected Map dbNameToId = Maps.newConcurrentMap(); private boolean objectCreated = false; protected boolean invalidCacheInInit = true; + protected ExternalMetadataOps metadataOps; private ExternalSchemaCache schemaCache; private String comment; @@ -129,16 +136,21 @@ public Configuration getConfiguration() { /** * set some default properties when creating catalog + * @return list of database names in this catalog */ + protected List listDatabaseNames() { + if (metadataOps == null) { + throw new UnsupportedOperationException("Unsupported operation: " + + "listDatabaseNames from remote client when init catalog with " + logType.name()); + } else { + return metadataOps.listDatabaseNames(); + } + } + public void setDefaultPropsWhenCreating(boolean isReplay) throws DdlException { } - /** - * @return list of database names in this catalog - */ - protected abstract List listDatabaseNames(); - /** * @param dbName * @return names of tables in specified database @@ -578,12 +590,72 @@ public void addDatabaseForTest(ExternalDatabase db) { dbNameToId.put(ClusterNamespace.getNameFromFullName(db.getFullName()), db.getId()); } - public void dropDatabase(String dbName) { - throw new NotImplementedException("dropDatabase not implemented"); + @Override + public void createDb(CreateDbStmt stmt) throws DdlException { + makeSureInitialized(); + if (metadataOps == null) { + LOG.warn("dropDatabase not implemented"); + return; + } + try { + metadataOps.createDb(stmt); + } catch (Exception e) { + LOG.warn("Failed to create a database.", e); + throw e; + } + } + + @Override + public void dropDb(DropDbStmt stmt) throws DdlException { + makeSureInitialized(); + if (metadataOps == null) { + LOG.warn("dropDatabase not implemented"); + return; + } + try { + metadataOps.dropDb(stmt); + } catch (Exception e) { + LOG.warn("Failed to drop a database.", e); + throw e; + } + } + + @Override + public void createTable(CreateTableStmt stmt) throws UserException { + makeSureInitialized(); + if (metadataOps == null) { + LOG.warn("createTable not implemented"); + return; + } + try { + metadataOps.createTable(stmt); + } catch (Exception e) { + LOG.warn("Failed to create a table.", e); + throw e; + } + } + + @Override + public void dropTable(DropTableStmt stmt) throws DdlException { + makeSureInitialized(); + if (metadataOps == null) { + LOG.warn("dropTable not implemented"); + return; + } + try { + metadataOps.dropTable(stmt); + } catch (Exception e) { + LOG.warn("Failed to drop a table", e); + throw e; + } + } + + public void unregisterDatabase(String dbName) { + throw new NotImplementedException("unregisterDatabase not implemented"); } - public void createDatabase(long dbId, String dbName) { - throw new NotImplementedException("createDatabase not implemented"); + public void registerDatabase(long dbId, String dbName) { + throw new NotImplementedException("registerDatabase not implemented"); } public Map getIncludeDatabaseMap() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java index a4f4afde7eb002..5411927eb570b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java @@ -154,7 +154,7 @@ public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) { } } for (int i = 0; i < log.getCreateCount(); i++) { - T table = getExternalTable(log.getCreateTableNames().get(i), log.getCreateTableIds().get(i), catalog); + T table = newExternalTable(log.getCreateTableNames().get(i), log.getCreateTableIds().get(i), catalog); tmpTableNameToId.put(table.getName(), table.getId()); tmpIdToTbl.put(table.getId(), table); } @@ -190,7 +190,7 @@ protected void init() { } else { tblId = Env.getCurrentEnv().getNextId(); tmpTableNameToId.put(tableName, tblId); - T table = getExternalTable(tableName, tblId, extCatalog); + T table = newExternalTable(tableName, tblId, extCatalog); tmpIdToTbl.put(tblId, table); initDatabaseLog.addCreateTable(tblId, tableName); } @@ -205,7 +205,7 @@ protected void init() { Env.getCurrentEnv().getEditLog().logInitExternalDb(initDatabaseLog); } - protected abstract T getExternalTable(String tableName, long tblId, ExternalCatalog catalog); + protected abstract T newExternalTable(String tableName, long tblId, ExternalCatalog catalog); public T getTableForReplay(long tableId) { return idToTbl.get(tableId); @@ -364,8 +364,19 @@ public void gsonPostProcess() throws IOException { } @Override - public void dropTable(String tableName) { - throw new NotImplementedException("dropTable() is not implemented"); + public void unregisterTable(String tableName) { + if (LOG.isDebugEnabled()) { + LOG.debug("create table [{}]", tableName); + } + Long tableId = tableNameToId.remove(tableName); + if (tableId == null) { + LOG.warn("table [{}] does not exist when drop", tableName); + return; + } + idToTbl.remove(tableId); + setLastUpdateTime(System.currentTimeMillis()); + Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache( + extCatalog.getId(), getFullName(), tableName); } @Override @@ -374,8 +385,16 @@ public CatalogIf getCatalog() { } // Only used for sync hive metastore event - public void createTable(String tableName, long tableId) { - throw new NotImplementedException("createTable() is not implemented"); + public boolean registerTable(TableIf tableIf) { + long tableId = tableIf.getId(); + String tableName = tableIf.getName(); + if (LOG.isDebugEnabled()) { + LOG.debug("create table [{}]", tableName); + } + tableNameToId.put(tableName, tableId); + idToTbl.put(tableId, newExternalTable(tableName, tableId, extCatalog)); + setLastUpdateTime(System.currentTimeMillis()); + return true; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java index d756e803a23fe0..ce3220a05f9266 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java @@ -333,6 +333,10 @@ public long getUpdateTime() { return this.schemaUpdateTime; } + public void setUpdateTime(long schemaUpdateTime) { + this.schemaUpdateTime = schemaUpdateTime; + } + @Override public long getLastCheckTime() { return 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index ad8584a5e7bab7..e2e066f5c7266e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -134,7 +134,7 @@ import org.apache.doris.common.util.Util; import org.apache.doris.datasource.es.EsRepository; import org.apache.doris.datasource.hive.HMSCachedClient; -import org.apache.doris.datasource.hive.HMSCachedClientFactory; +import org.apache.doris.datasource.hive.HiveMetadataOps; import org.apache.doris.datasource.property.constants.HMSProperties; import org.apache.doris.nereids.trees.plans.commands.info.DropMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; @@ -947,7 +947,7 @@ public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, Env.getCurrentEnv().getMtmvService().deregisterMTMV((MTMV) table); } - db.dropTable(table.getName()); + db.unregisterTable(table.getName()); StopWatch watch = StopWatch.createStarted(); Env.getCurrentRecycleBin().recycleTable(db.getId(), table, isReplay, isForceDrop, recycleTime); watch.stop(); @@ -2152,7 +2152,7 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws UserExcep // set time series compaction time threshold long timeSeriesCompactionTimeThresholdSeconds - = PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE; + = PropertyAnalyzer.TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE; try { timeSeriesCompactionTimeThresholdSeconds = PropertyAnalyzer .analyzeTimeSeriesCompactionTimeThresholdSeconds(properties); @@ -2163,7 +2163,7 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws UserExcep // set time series compaction empty rowsets threshold long timeSeriesCompactionEmptyRowsetsThreshold - = PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE; + = PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE; try { timeSeriesCompactionEmptyRowsetsThreshold = PropertyAnalyzer .analyzeTimeSeriesCompactionEmptyRowsetsThreshold(properties); @@ -2799,7 +2799,7 @@ private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlExcept if (!Strings.isNullOrEmpty(hiveTable.getHiveProperties().get(HMSProperties.HIVE_VERSION))) { hiveConf.set(HMSProperties.HIVE_VERSION, hiveTable.getHiveProperties().get(HMSProperties.HIVE_VERSION)); } - HMSCachedClient client = new HMSCachedClientFactory().createCachedClient(hiveConf, 1, null); + HMSCachedClient client = HiveMetadataOps.createCachedClient(hiveConf, 1, null); if (!client.tableExists(hiveTable.getHiveDb(), hiveTable.getHiveTable())) { throw new DdlException(String.format("Table [%s] dose not exist in Hive.", hiveTable.getHiveDbTable())); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableMetadata.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/TableMetadata.java new file mode 100644 index 00000000000000..e266c519e4241e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/TableMetadata.java @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import java.util.Map; + +public interface TableMetadata { + String getDbName(); + + String getTableName(); + + Map getProperties(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsExternalDatabase.java index 7d576bba1b01c7..cf4129f25a5cf6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsExternalDatabase.java @@ -38,7 +38,7 @@ public EsExternalDatabase(ExternalCatalog extCatalog, long id, String name) { } @Override - protected EsExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) { + protected EsExternalTable newExternalTable(String tableName, long tblId, ExternalCatalog catalog) { return new EsExternalTable(tblId, tableName, name, (EsExternalCatalog) extCatalog); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java index 6e7f45aaa415c6..c26de66058bcbd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java @@ -18,6 +18,8 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.TableName; +import org.apache.doris.datasource.DatabaseMetadata; +import org.apache.doris.datasource.TableMetadata; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; import org.apache.hadoop.hive.common.ValidWriteIdList; @@ -78,4 +80,14 @@ NotificationEventResponse getNextNotification(long lastEventId, void acquireSharedLock(String queryId, long txnId, String user, TableName tblName, List partitionNames, long timeoutMs); + + String getCatalogLocation(String catalogName); + + void createDatabase(DatabaseMetadata catalogDatabase); + + void dropDatabase(String dbName); + + void dropTable(String dbName, String tableName); + + void createTable(TableMetadata catalogTable, boolean ignoreIfExists); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClientFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClientFactory.java deleted file mode 100644 index 10e81993d03638..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClientFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.datasource.hive; - -import org.apache.doris.catalog.JdbcResource; -import org.apache.doris.datasource.jdbc.client.JdbcClient; -import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.hive.conf.HiveConf; - -public class HMSCachedClientFactory { - public static HMSCachedClient createCachedClient(HiveConf hiveConf, int thriftClientPoolSize, - JdbcClientConfig jdbcClientConfig) { - if (hiveConf != null) { - return new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize); - } - Preconditions.checkNotNull(jdbcClientConfig, "hiveConf and jdbcClientConfig are both null"); - String dbType = JdbcClient.parseDbType(jdbcClientConfig.getJdbcUrl()); - switch (dbType) { - case JdbcResource.POSTGRESQL: - return new PostgreSQLJdbcHMSCachedClient(jdbcClientConfig); - default: - throw new IllegalArgumentException("Unsupported DB type: " + dbType); - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index afbbd36f02431e..11c5ad72c2b5e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -31,6 +31,7 @@ import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.SessionContext; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; +import org.apache.doris.datasource.operations.ExternalMetadataOperations; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.HMSProperties; @@ -51,9 +52,6 @@ public class HMSExternalCatalog extends ExternalCatalog { private static final Logger LOG = LogManager.getLogger(HMSExternalCatalog.class); - private static final int MIN_CLIENT_POOL_SIZE = 8; - protected HMSCachedClient client; - public static final String FILE_META_CACHE_TTL_SECOND = "file.meta.cache.ttl-second"; // broker name for file split and query scan. public static final String BIND_BROKER_NAME = "broker.name"; @@ -146,10 +144,9 @@ protected void initLocalObjectsImpl() { HadoopUGI.tryKrbLogin(this.getName(), AuthenticationConfig.getKerberosConfig(hiveConf, AuthenticationConfig.HIVE_KERBEROS_PRINCIPAL, AuthenticationConfig.HIVE_KERBEROS_KEYTAB)); - client = HMSCachedClientFactory.createCachedClient(hiveConf, - Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size), - jdbcClientConfig); + metadataOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this); } + } @Override @@ -161,13 +158,13 @@ public List listTableNames(SessionContext ctx, String dbName) { hmsExternalDatabase.getTables().forEach(table -> names.add(table.getName())); return names; } else { - return client.getAllTables(ClusterNamespace.getNameFromFullName(dbName)); + return metadataOps.listTableNames(ClusterNamespace.getNameFromFullName(dbName)); } } @Override public boolean tableExist(SessionContext ctx, String dbName, String tblName) { - return client.tableExists(ClusterNamespace.getNameFromFullName(dbName), tblName); + return metadataOps.tableExist(ClusterNamespace.getNameFromFullName(dbName), tblName); } @Override @@ -182,11 +179,11 @@ public boolean tableExistInLocal(String dbName, String tblName) { public HMSCachedClient getClient() { makeSureInitialized(); - return client; + return ((HiveMetadataOps) metadataOps).getClient(); } @Override - public void dropDatabase(String dbName) { + public void unregisterDatabase(String dbName) { if (LOG.isDebugEnabled()) { LOG.debug("drop database [{}]", dbName); } @@ -195,10 +192,11 @@ public void dropDatabase(String dbName) { LOG.warn("drop database [{}] failed", dbName); } idToDb.remove(dbId); + Env.getCurrentEnv().getExtMetaCacheMgr().invalidateDbCache(getId(), dbName); } @Override - public void createDatabase(long dbId, String dbName) { + public void registerDatabase(long dbId, String dbName) { if (LOG.isDebugEnabled()) { LOG.debug("create database [{}]", dbName); } @@ -234,8 +232,4 @@ public String getHiveMetastoreUris() { public String getHiveVersion() { return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, ""); } - - protected List listDatabaseNames() { - return client.getAllDatabases(); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalDatabase.java index 2287909f26a84a..8a16e66996f37a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalDatabase.java @@ -17,19 +17,15 @@ package org.apache.doris.datasource.hive; +import org.apache.doris.catalog.TableIf; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.InitDatabaseLog; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - /** * Hive metastore external database. */ public class HMSExternalDatabase extends ExternalDatabase { - private static final Logger LOG = LogManager.getLogger(HMSExternalDatabase.class); - /** * Create HMS external database. * @@ -42,7 +38,7 @@ public HMSExternalDatabase(ExternalCatalog extCatalog, long id, String name) { } @Override - protected HMSExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) { + protected HMSExternalTable newExternalTable(String tableName, long tblId, ExternalCatalog catalog) { return new HMSExternalTable(tblId, tableName, name, (HMSExternalCatalog) extCatalog); } @@ -52,25 +48,12 @@ public void addTableForTest(HMSExternalTable tbl) { } @Override - public void dropTable(String tableName) { - if (LOG.isDebugEnabled()) { - LOG.debug("replayDropTableFromEvent [{}]", tableName); - } - Long tableId = tableNameToId.remove(tableName); - if (tableId == null) { - LOG.warn("replayDropTableFromEvent [{}] failed", tableName); - return; - } - idToTbl.remove(tableId); - } - - @Override - public void createTable(String tableName, long tableId) { - if (LOG.isDebugEnabled()) { - LOG.debug("create table [{}]", tableName); + public boolean registerTable(TableIf tableIf) { + super.registerTable(tableIf); + HMSExternalTable table = getTableNullable(tableIf.getName()); + if (table != null) { + table.setEventUpdateTime(tableIf.getUpdateTime()); } - tableNameToId.put(tableName, tableId); - HMSExternalTable table = getExternalTable(tableName, tableId, extCatalog); - idToTbl.put(tableId, table); + return true; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveDatabaseMetadata.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveDatabaseMetadata.java new file mode 100644 index 00000000000000..50a80db3962497 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveDatabaseMetadata.java @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.datasource.DatabaseMetadata; + +import lombok.Data; + +import java.util.Map; + +@Data +public class HiveDatabaseMetadata implements DatabaseMetadata { + private String dbName; + private String locationUri; + private Map properties; + private String comment; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java index d5656778cdf535..037fbe02d68b4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreClientHelper.java @@ -150,7 +150,7 @@ public static String getFormat(String input) throws DdlException { } } - public static IMetaStoreClient getClient(String metaStoreUris) throws DdlException { + private static IMetaStoreClient getClient(String metaStoreUris) throws DdlException { HiveConf hiveConf = new HiveConf(); hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUris); hiveConf.set(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(), @@ -584,6 +584,34 @@ private static int findNextNestedField(String commaSplitFields) { return commaSplitFields.length(); } + /** + * Convert doris type to hive type. + */ + public static String dorisTypeToHiveType(Type dorisType) { + if (dorisType.equals(Type.BOOLEAN)) { + return "boolean"; + } else if (dorisType.equals(Type.TINYINT)) { + return "tinyint"; + } else if (dorisType.equals(Type.SMALLINT)) { + return "smallint"; + } else if (dorisType.equals(Type.INT)) { + return "int"; + } else if (dorisType.equals(Type.BIGINT)) { + return "bigint"; + } else if (dorisType.equals(Type.DATE) || dorisType.equals(Type.DATEV2)) { + return "date"; + } else if (dorisType.equals(Type.DATETIME) || dorisType.equals(Type.DATETIMEV2)) { + return "timestamp"; + } else if (dorisType.equals(Type.FLOAT)) { + return "float"; + } else if (dorisType.equals(Type.DOUBLE)) { + return "double"; + } else if (dorisType.equals(Type.STRING)) { + return "string"; + } + throw new HMSClientException("Unsupported type conversion of " + dorisType.toSql()); + } + /** * Convert hive type to doris type. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java new file mode 100644 index 00000000000000..6779a602cbfbae --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.DropDbStmt; +import org.apache.doris.analysis.DropTableStmt; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.JdbcResource; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalDatabase; +import org.apache.doris.datasource.jdbc.client.JdbcClient; +import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; +import org.apache.doris.datasource.operations.ExternalMetadataOps; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class HiveMetadataOps implements ExternalMetadataOps { + private static final Logger LOG = LogManager.getLogger(HiveMetadataOps.class); + private static final int MIN_CLIENT_POOL_SIZE = 8; + private JdbcClientConfig jdbcClientConfig; + private HiveConf hiveConf; + private HMSExternalCatalog catalog; + private HMSCachedClient client; + + public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig jdbcClientConfig, HMSExternalCatalog catalog) { + this.catalog = catalog; + this.hiveConf = hiveConf; + this.jdbcClientConfig = jdbcClientConfig; + this.client = createCachedClient(hiveConf, + Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size), jdbcClientConfig); + } + + public HMSCachedClient getClient() { + return client; + } + + public static HMSCachedClient createCachedClient(HiveConf hiveConf, int thriftClientPoolSize, + JdbcClientConfig jdbcClientConfig) { + if (hiveConf != null) { + return new ThriftHMSCachedClient(hiveConf, thriftClientPoolSize); + } + Preconditions.checkNotNull(jdbcClientConfig, "hiveConf and jdbcClientConfig are both null"); + String dbType = JdbcClient.parseDbType(jdbcClientConfig.getJdbcUrl()); + switch (dbType) { + case JdbcResource.POSTGRESQL: + return new PostgreSQLJdbcHMSCachedClient(jdbcClientConfig); + default: + throw new IllegalArgumentException("Unsupported DB type: " + dbType); + } + } + + @Override + public void createDb(CreateDbStmt stmt) throws DdlException { + String fullDbName = stmt.getFullDbName(); + Map properties = stmt.getProperties(); + long dbId = Env.getCurrentEnv().getNextId(); + try { + HiveDatabaseMetadata catalogDatabase = new HiveDatabaseMetadata(); + catalogDatabase.setDbName(fullDbName); + catalogDatabase.setProperties(properties); + if (properties.containsKey("location_uri")) { + catalogDatabase.setLocationUri(properties.get("location_uri")); + } + catalogDatabase.setComment(properties.getOrDefault("comment", "")); + client.createDatabase(catalogDatabase); + catalog.onRefresh(true); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + LOG.info("createDb dbName = " + fullDbName + ", id = " + dbId); + } + + @Override + public void dropDb(DropDbStmt stmt) throws DdlException { + String dbName = stmt.getDbName(); + try { + client.dropDatabase(dbName); + catalog.onRefresh(true); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + public void createTable(CreateTableStmt stmt) throws UserException { + String dbName = stmt.getDbName(); + String tblName = stmt.getTableName(); + ExternalDatabase db = catalog.getDbNullable(dbName); + if (db == null) { + throw new UserException("Failed to get database: '" + dbName + "' in catalog: " + catalog.getName()); + } + try { + Map props = stmt.getExtProperties(); + String inputFormat = props.getOrDefault("input_format", Config.hive_default_input_format); + String outputFormat = props.getOrDefault("output_format", Config.hive_default_output_format); + String serDe = props.getOrDefault("serde", Config.hive_default_serde); + HiveTableMetadata catalogTable = HiveTableMetadata.of(dbName, + tblName, + stmt.getColumns(), + parsePartitionKeys(props), + props, + inputFormat, + outputFormat, + serDe); + + client.createTable(catalogTable, stmt.isSetIfNotExists()); + db.setUnInitialized(true); + } catch (Exception e) { + throw new UserException(e.getMessage(), e); + } + } + + private static List parsePartitionKeys(Map props) { + List parsedKeys = new ArrayList<>(); + String pkStr = props.getOrDefault("partition_keys", ""); + if (pkStr.isEmpty()) { + return parsedKeys; + } else { + // TODO: parse string to partition keys list + return parsedKeys; + } + } + + @Override + public void dropTable(DropTableStmt stmt) throws DdlException { + String dbName = stmt.getDbName(); + ExternalDatabase db = catalog.getDbNullable(stmt.getDbName()); + if (db == null) { + throw new DdlException("Failed to get database: '" + dbName + "' in catalog: " + catalog.getName()); + } + try { + client.dropTable(dbName, stmt.getTableName()); + db.setUnInitialized(true); + } catch (Exception e) { + throw new DdlException(e.getMessage(), e); + } + } + + @Override + public List listTableNames(String dbName) { + return client.getAllTables(dbName); + } + + @Override + public boolean tableExist(String dbName, String tblName) { + return client.tableExists(dbName, tblName); + } + + public List listDatabaseNames() { + return client.getAllDatabases(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java new file mode 100644 index 00000000000000..8edd3033187a6f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.catalog.Column; +import org.apache.doris.datasource.TableMetadata; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; + +import java.util.List; +import java.util.Map; + +public class HiveTableMetadata implements TableMetadata { + private String dbName; + private String tableName; + private List columns; + private List partitionKeys; + private String inputFormat; + private String outputFormat; + private String serDe; + private Map properties; + // private String viewSql; + + public HiveTableMetadata(String dbName, + String tblName, + List columns, + List partitionKeys, + Map props, + String inputFormat, + String outputFormat, + String serDe) { + this.dbName = dbName; + this.tableName = tblName; + this.columns = columns; + this.partitionKeys = partitionKeys; + this.inputFormat = inputFormat; + this.outputFormat = outputFormat; + this.serDe = serDe; + this.properties = props; + } + + @Override + public String getDbName() { + return dbName; + } + + @Override + public String getTableName() { + return tableName; + } + + @Override + public Map getProperties() { + return properties; + } + + public List getColumns() { + return columns; + } + + public List getPartitionKeys() { + return partitionKeys; + } + + public String getInputFormat() { + return inputFormat; + } + + public String getOutputFormat() { + return outputFormat; + } + + public String getSerDe() { + return serDe; + } + + public static HiveTableMetadata of(String dbName, + String tblName, + List columns, + List partitionKeys, + Map props, + String inputFormat, + String outputFormat, String serDe) { + return new HiveTableMetadata(dbName, tblName, columns, partitionKeys, props, + inputFormat, outputFormat, serDe); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java index 174a15221b84a2..e587debdb3553f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java @@ -20,6 +20,8 @@ import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.JdbcTable; import org.apache.doris.catalog.Type; +import org.apache.doris.datasource.DatabaseMetadata; +import org.apache.doris.datasource.TableMetadata; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; import org.apache.doris.thrift.TOdbcTableType; @@ -29,6 +31,7 @@ import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -503,4 +506,25 @@ protected String getDatabaseQuery() { protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient."); } + + public void createDatabase(DatabaseMetadata database) { + throw new NotImplementedException("PostgreSQL createDatabase not implemented"); + } + + public void dropDatabase(String dbName) { + throw new NotImplementedException("PostgreSQL dropDatabase not implemented"); + } + + public void createTable(TableMetadata hiveTable, boolean ignoreIfExists) { + throw new NotImplementedException("PostgreSQL createTable not implemented"); + } + + public void dropTable(String dbName, String tblName) { + throw new NotImplementedException("PostgreSQL dropTable not implemented"); + } + + @Override + public String getCatalogLocation(String catalogName) { + throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient."); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java index f7f61a2e94956e..c8207906624eee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java @@ -18,7 +18,10 @@ package org.apache.doris.datasource.hive; import org.apache.doris.analysis.TableName; +import org.apache.doris.catalog.Column; import org.apache.doris.common.Config; +import org.apache.doris.datasource.DatabaseMetadata; +import org.apache.doris.datasource.TableMetadata; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; import org.apache.doris.datasource.property.constants.HMSProperties; @@ -26,6 +29,7 @@ import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; @@ -38,6 +42,7 @@ import org.apache.hadoop.hive.metastore.LockComponentBuilder; import org.apache.hadoop.hive.metastore.LockRequestBuilder; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Catalog; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.DataOperationType; @@ -49,6 +54,8 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableValidWriteIds; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -56,11 +63,14 @@ import org.apache.logging.log4j.Logger; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Queue; @@ -117,6 +127,151 @@ public List getAllTables(String dbName) { } } + @Override + public void createDatabase(DatabaseMetadata db) { + try (ThriftHMSClient client = getClient()) { + try { + if (db instanceof HiveDatabaseMetadata) { + HiveDatabaseMetadata hiveDb = (HiveDatabaseMetadata) db; + ugiDoAs(() -> { + client.client.createDatabase(toHiveDatabase(hiveDb)); + return null; + }); + } + } catch (Exception e) { + client.setThrowable(e); + throw e; + } + } catch (Exception e) { + throw new HMSClientException("failed to create database from hms client", e); + } + } + + private static Database toHiveDatabase(HiveDatabaseMetadata hiveDb) { + Database database = new Database(); + database.setName(hiveDb.getDbName()); + if (StringUtils.isNotEmpty(hiveDb.getLocationUri())) { + database.setLocationUri(hiveDb.getLocationUri()); + } + database.setParameters(hiveDb.getProperties()); + database.setDescription(hiveDb.getComment()); + return database; + } + + @Override + public void createTable(TableMetadata tbl, boolean ignoreIfExists) { + if (tableExists(tbl.getDbName(), tbl.getTableName())) { + throw new HMSClientException("Table '" + tbl.getTableName() + + "' has existed in '" + tbl.getDbName() + "'."); + } + try (ThriftHMSClient client = getClient()) { + try { + // String location, + if (tbl instanceof HiveTableMetadata) { + ugiDoAs(() -> { + client.client.createTable(toHiveTable((HiveTableMetadata) tbl)); + return null; + }); + } + } catch (Exception e) { + client.setThrowable(e); + throw e; + } + } catch (Exception e) { + throw new HMSClientException("failed to create database from hms client", e); + } + } + + private static Table toHiveTable(HiveTableMetadata hiveTable) { + Objects.requireNonNull(hiveTable.getDbName(), "Hive database name should be not null"); + Objects.requireNonNull(hiveTable.getTableName(), "Hive table name should be not null"); + Table table = new Table(); + table.setDbName(hiveTable.getDbName()); + table.setTableName(hiveTable.getTableName()); + // table.setOwner(""); + int createTime = (int) System.currentTimeMillis() * 1000; + table.setCreateTime(createTime); + table.setLastAccessTime(createTime); + // table.setRetention(0); + String location = hiveTable.getProperties().get("external_location"); + table.setSd(toHiveStorageDesc(hiveTable.getColumns(), + hiveTable.getInputFormat(), + hiveTable.getOutputFormat(), + hiveTable.getSerDe(), + location)); + table.setPartitionKeys(hiveTable.getPartitionKeys()); + // table.setViewOriginalText(hiveTable.getViewSql()); + // table.setViewExpandedText(hiveTable.getViewSql()); + table.setTableType("MANAGED_TABLE"); + table.setParameters(hiveTable.getProperties()); + return table; + } + + private static StorageDescriptor toHiveStorageDesc(List columns, String inputFormat, String outputFormat, + String serDe, String location) { + StorageDescriptor sd = new StorageDescriptor(); + sd.setCols(toHiveColumns(columns)); + SerDeInfo serDeInfo = new SerDeInfo(); + serDeInfo.setSerializationLib(serDe); + sd.setSerdeInfo(serDeInfo); + sd.setInputFormat(inputFormat); + sd.setOutputFormat(outputFormat); + if (StringUtils.isNotEmpty(location)) { + sd.setLocation(location); + } + Map parameters = new HashMap<>(); + parameters.put("tag", "doris external hive talbe"); + sd.setParameters(parameters); + return sd; + } + + private static List toHiveColumns(List columns) { + List result = new ArrayList<>(); + for (Column column : columns) { + FieldSchema hiveFieldSchema = new FieldSchema(); + // TODO: add doc, just support doris type + hiveFieldSchema.setType(HiveMetaStoreClientHelper.dorisTypeToHiveType(column.getType())); + hiveFieldSchema.setName(column.getName()); + hiveFieldSchema.setComment(column.getComment()); + result.add(hiveFieldSchema); + } + return result; + } + + @Override + public void dropDatabase(String dbName) { + try (ThriftHMSClient client = getClient()) { + try { + ugiDoAs(() -> { + client.client.dropDatabase(dbName); + return null; + }); + } catch (Exception e) { + client.setThrowable(e); + throw e; + } + } catch (Exception e) { + throw new HMSClientException("failed to drop database from hms client", e); + } + } + + @Override + public void dropTable(String dbName, String tblName) { + try (ThriftHMSClient client = getClient()) { + try { + ugiDoAs(() -> { + client.client.dropTable(dbName, tblName); + return null; + }); + } catch (Exception e) { + client.setThrowable(e); + throw e; + } + } catch (Exception e) { + throw new HMSClientException("failed to drop database from hms client", e); + } + } + @Override public boolean tableExists(String dbName, String tblName) { try (ThriftHMSClient client = getClient()) { @@ -474,6 +629,21 @@ private ThriftHMSClient getClient() throws MetaException { } } + @Override + public String getCatalogLocation(String catalogName) { + try (ThriftHMSClient client = getClient()) { + try { + Catalog catalog = ugiDoAs(() -> client.client.getCatalog(catalogName)); + return catalog.getLocationUri(); + } catch (Exception e) { + client.setThrowable(e); + throw e; + } + } catch (Exception e) { + throw new HMSClientException("failed to get location for %s from hms client", e, catalogName); + } + } + private T ugiDoAs(PrivilegedExceptionAction action) { return HiveMetaStoreClientHelper.ugiDoAs(hiveConf, action); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java index 0d3cc2edc41f91..c4529d5d48747e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java @@ -87,8 +87,8 @@ private void processRename() throws DdlException { catalogName, dbAfter.getName()); return; } - Env.getCurrentEnv().getCatalogMgr().dropExternalDatabase(dbBefore.getName(), catalogName, true); - Env.getCurrentEnv().getCatalogMgr().createExternalDatabase(dbAfter.getName(), catalogName, true); + Env.getCurrentEnv().getCatalogMgr().unregisterExternalDatabase(dbBefore.getName(), catalogName, true); + Env.getCurrentEnv().getCatalogMgr().registerExternalDatabase(dbAfter.getName(), catalogName, true); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java index c69812a22b30db..1567960b7f5b58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java @@ -98,9 +98,9 @@ private void processRecreateTable() throws DdlException { return; } Env.getCurrentEnv().getCatalogMgr() - .dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true); + .unregisterExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true); Env.getCurrentEnv().getCatalogMgr() - .createExternalTableFromEvent( + .registerExternalTableFromEvent( tableAfter.getDbName(), tableAfter.getTableName(), catalogName, eventTime, true); } @@ -117,9 +117,9 @@ private void processRename() throws DdlException { return; } Env.getCurrentEnv().getCatalogMgr() - .dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true); + .unregisterExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true); Env.getCurrentEnv().getCatalogMgr() - .createExternalTableFromEvent( + .registerExternalTableFromEvent( tableAfter.getDbName(), tableAfter.getTableName(), catalogName, eventTime, true); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java index d79d23824ab163..8829d2ae98831a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java @@ -55,8 +55,7 @@ protected static List getEvents(NotificationEvent event, protected void process() throws MetastoreNotificationException { try { infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName); - Env.getCurrentEnv().getCatalogMgr() - .createExternalDatabase(dbName, catalogName, true); + Env.getCurrentEnv().getCatalogMgr().registerExternalDatabase(dbName, catalogName, true); } catch (DdlException e) { throw new MetastoreNotificationException( debugString("Failed to process event"), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java index 246ce8626f4aca..e6c3e2e7eae9d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java @@ -80,7 +80,7 @@ protected void process() throws MetastoreNotificationException { try { infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName, tblName); Env.getCurrentEnv().getCatalogMgr() - .createExternalTableFromEvent(dbName, hmsTbl.getTableName(), catalogName, eventTime, true); + .registerExternalTableFromEvent(dbName, hmsTbl.getTableName(), catalogName, eventTime, true); } catch (DdlException e) { throw new MetastoreNotificationException( debugString("Failed to process event"), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java index ca69e6f14d015a..107ce591a42b03 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropDatabaseEvent.java @@ -55,7 +55,7 @@ protected void process() throws MetastoreNotificationException { try { infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName); Env.getCurrentEnv().getCatalogMgr() - .dropExternalDatabase(dbName, catalogName, true); + .unregisterExternalDatabase(dbName, catalogName, true); } catch (DdlException e) { throw new MetastoreNotificationException( debugString("Failed to process event"), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java index 0f62e2460820ad..6dcb16dedad369 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java @@ -78,7 +78,7 @@ protected boolean willChangeTableName() { protected void process() throws MetastoreNotificationException { try { infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName, tableName); - Env.getCurrentEnv().getCatalogMgr().dropExternalTable(dbName, tableName, catalogName, true); + Env.getCurrentEnv().getCatalogMgr().unregisterExternalTable(dbName, tableName, catalogName, true); } catch (DdlException e) { throw new MetastoreNotificationException( debugString("Failed to process event"), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/DorisTypeToIcebergType.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/DorisTypeToIcebergType.java new file mode 100644 index 00000000000000..d6370c583dafbe --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/DorisTypeToIcebergType.java @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.MapType; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.StructField; +import org.apache.doris.catalog.StructType; +import org.apache.doris.datasource.DorisTypeVisitor; + +import com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +import java.util.List; + + +/** + * Convert Doris type to Iceberg type + */ +public class DorisTypeToIcebergType extends DorisTypeVisitor { + private final StructType root; + private int nextId = 0; + + public DorisTypeToIcebergType() { + this.root = null; + } + + public DorisTypeToIcebergType(StructType root) { + this.root = root; + // the root struct's fields use the first ids + this.nextId = root.getFields().size(); + } + + private int getNextId() { + int next = nextId; + nextId += 1; + return next; + } + + @Override + public Type struct(StructType struct, List types) { + List fields = struct.getFields(); + List newFields = Lists.newArrayListWithExpectedSize(fields.size()); + boolean isRoot = root == struct; + for (int i = 0; i < fields.size(); i++) { + StructField field = fields.get(i); + Type type = types.get(i); + + int id = isRoot ? i : getNextId(); + if (field.getContainsNull()) { + newFields.add(Types.NestedField.optional(id, field.getName(), type, field.getComment())); + } else { + newFields.add(Types.NestedField.required(id, field.getName(), type, field.getComment())); + } + } + return Types.StructType.of(newFields); + } + + @Override + public Type field(StructField field, Type typeResult) { + return typeResult; + } + + @Override + public Type array(ArrayType array, Type elementType) { + if (array.getContainsNull()) { + return Types.ListType.ofOptional(getNextId(), elementType); + } else { + return Types.ListType.ofRequired(getNextId(), elementType); + } + } + + @Override + public Type map(MapType map, Type keyType, Type valueType) { + if (map.getIsValueContainsNull()) { + return Types.MapType.ofOptional(getNextId(), getNextId(), keyType, valueType); + } else { + return Types.MapType.ofRequired(getNextId(), getNextId(), keyType, valueType); + } + } + + @Override + public Type atomic(org.apache.doris.catalog.Type atomic) { + PrimitiveType primitiveType = atomic.getPrimitiveType(); + if (primitiveType.equals(PrimitiveType.BOOLEAN)) { + return Types.BooleanType.get(); + } else if (primitiveType.equals(PrimitiveType.TINYINT) + || primitiveType.equals(PrimitiveType.SMALLINT) + || primitiveType.equals(PrimitiveType.INT)) { + return Types.IntegerType.get(); + } else if (primitiveType.equals(PrimitiveType.BIGINT) + || primitiveType.equals(PrimitiveType.LARGEINT)) { + return Types.LongType.get(); + } else if (primitiveType.equals(PrimitiveType.FLOAT)) { + return Types.FloatType.get(); + } else if (primitiveType.equals(PrimitiveType.DOUBLE)) { + return Types.DoubleType.get(); + } else if (primitiveType.equals(PrimitiveType.CHAR) + || primitiveType.equals(PrimitiveType.VARCHAR) + || primitiveType.equals(PrimitiveType.STRING)) { + return Types.StringType.get(); + } else if (primitiveType.equals(PrimitiveType.DATE) + || primitiveType.equals(PrimitiveType.DATEV2)) { + return Types.DateType.get(); + } else if (primitiveType.equals(PrimitiveType.TIME) + || primitiveType.equals(PrimitiveType.TIMEV2)) { + return Types.TimeType.get(); + } else if (primitiveType.equals(PrimitiveType.DECIMALV2) + || primitiveType.isDecimalV3Type()) { + return Types.DecimalType.of( + ((ScalarType) atomic).getScalarPrecision(), + ((ScalarType) atomic).getScalarScale()); + } else if (primitiveType.equals(PrimitiveType.DATETIME) + || primitiveType.equals(PrimitiveType.DATETIMEV2)) { + return Types.TimestampType.withoutZone(); + } + // unsupported type: PrimitiveType.HLL BITMAP BINARY + + throw new UnsupportedOperationException( + "Not a supported type: " + primitiveType); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergDLFExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergDLFExternalCatalog.java index a243a17b31a979..e4d8b2f55c43f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergDLFExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergDLFExternalCatalog.java @@ -37,7 +37,7 @@ public IcebergDLFExternalCatalog(long catalogId, String name, String resource, M } @Override - protected void initLocalObjectsImpl() { + protected void initCatalog() { icebergCatalogType = ICEBERG_DLF; DLFCatalog dlfCatalog = new DLFCatalog(); dlfCatalog.setConf(getConfiguration()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index fadc60913bed0c..426657bc539d86 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -17,27 +17,17 @@ package org.apache.doris.datasource.iceberg; -import org.apache.doris.catalog.Env; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.FeNameFormat; -import org.apache.doris.common.util.Util; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.SessionContext; +import org.apache.doris.datasource.operations.ExternalMetadataOperations; import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.SupportsNamespaces; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.util.List; -import java.util.stream.Collectors; public abstract class IcebergExternalCatalog extends ExternalCatalog { - private static final Logger LOG = LogManager.getLogger(IcebergExternalCatalog.class); public static final String ICEBERG_CATALOG_TYPE = "iceberg.catalog.type"; public static final String ICEBERG_REST = "rest"; public static final String ICEBERG_HMS = "hms"; @@ -46,7 +36,6 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { public static final String ICEBERG_DLF = "dlf"; protected String icebergCatalogType; protected Catalog catalog; - protected SupportsNamespaces nsCatalog; public IcebergExternalCatalog(long catalogId, String name, String comment) { super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment); @@ -54,18 +43,21 @@ public IcebergExternalCatalog(long catalogId, String name, String comment) { @Override protected void init() { - nsCatalog = (SupportsNamespaces) catalog; super.init(); } - public Catalog getCatalog() { - makeSureInitialized(); - return catalog; + // Create catalog based on catalog type + protected abstract void initCatalog(); + + @Override + protected void initLocalObjectsImpl() { + initCatalog(); + metadataOps = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog); } - public SupportsNamespaces getNsCatalog() { + public Catalog getCatalog() { makeSureInitialized(); - return nsCatalog; + return ((IcebergMetadataOps) metadataOps).getCatalog(); } public String getIcebergCatalogType() { @@ -73,39 +65,15 @@ public String getIcebergCatalogType() { return icebergCatalogType; } - protected List listDatabaseNames() { - return nsCatalog.listNamespaces().stream() - .map(e -> { - String dbName = e.toString(); - try { - FeNameFormat.checkDbName(dbName); - } catch (AnalysisException ex) { - Util.logAndThrowRuntimeException(LOG, - String.format("Not a supported namespace name format: %s", dbName), ex); - } - return dbName; - }) - .collect(Collectors.toList()); - } - @Override public boolean tableExist(SessionContext ctx, String dbName, String tblName) { makeSureInitialized(); - return catalog.tableExists(TableIdentifier.of(dbName, tblName)); + return metadataOps.tableExist(dbName, tblName); } @Override public List listTableNames(SessionContext ctx, String dbName) { makeSureInitialized(); - List tableIdentifiers = catalog.listTables(Namespace.of(dbName)); - return tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList()); - } - - public org.apache.iceberg.Table getIcebergTable(String dbName, String tblName) { - makeSureInitialized(); - return Env.getCurrentEnv() - .getExtMetaCacheMgr() - .getIcebergMetadataCache() - .getIcebergTable(this, dbName, tblName); + return metadataOps.listTableNames(dbName); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalDatabase.java index 5f1fb6914677f4..5bc31e31cf41f3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalDatabase.java @@ -21,42 +21,14 @@ import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.InitDatabaseLog; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - public class IcebergExternalDatabase extends ExternalDatabase { - private static final Logger LOG = LogManager.getLogger(IcebergExternalDatabase.class); - public IcebergExternalDatabase(ExternalCatalog extCatalog, Long id, String name) { super(extCatalog, id, name, InitDatabaseLog.Type.ICEBERG); } @Override - protected IcebergExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) { + protected IcebergExternalTable newExternalTable(String tableName, long tblId, ExternalCatalog catalog) { return new IcebergExternalTable(tblId, tableName, name, (IcebergExternalCatalog) extCatalog); } - - @Override - public void dropTable(String tableName) { - if (LOG.isDebugEnabled()) { - LOG.debug("drop table [{}]", tableName); - } - Long tableId = tableNameToId.remove(tableName); - if (tableId == null) { - LOG.warn("drop table [{}] failed", tableName); - } - idToTbl.remove(tableId); - } - - @Override - public void createTable(String tableName, long tableId) { - if (LOG.isDebugEnabled()) { - LOG.debug("create table [{}]", tableName); - } - tableNameToId.put(tableName, tableId); - IcebergExternalTable table = new IcebergExternalTable(tableId, tableName, name, - (IcebergExternalCatalog) extCatalog); - idToTbl.put(tableId, table); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java index dfc78f44944ba6..eef679c6f1314e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java @@ -35,6 +35,7 @@ import java.util.Optional; public class IcebergExternalTable extends ExternalTable { + public IcebergExternalTable(long id, String name, String dbName, IcebergExternalCatalog catalog) { super(id, name, catalog, dbName, TableType.ICEBERG_EXTERNAL_TABLE); } @@ -52,7 +53,6 @@ protected synchronized void makeSureInitialized() { @Override public List initSchema() { - makeSureInitialized(); return IcebergUtils.getSchema(catalog, dbName, name); } @@ -79,7 +79,7 @@ public Optional getColumnStatistic(String colName) { makeSureInitialized(); return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(), () -> StatisticsUtil.getIcebergColumnStats(colName, - ((IcebergExternalCatalog) catalog).getIcebergTable(dbName, name))); + IcebergUtils.getIcebergTable(catalog, dbName, name))); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergGlueExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergGlueExternalCatalog.java index a1be776292da41..08e7fe044ba4d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergGlueExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergGlueExternalCatalog.java @@ -25,11 +25,9 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.aws.glue.GlueCatalog; import org.apache.iceberg.aws.s3.S3FileIOProperties; -import org.apache.iceberg.catalog.Namespace; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; public class IcebergGlueExternalCatalog extends IcebergExternalCatalog { @@ -44,7 +42,7 @@ public IcebergGlueExternalCatalog(long catalogId, String name, String resource, } @Override - protected void initLocalObjectsImpl() { + protected void initCatalog() { icebergCatalogType = ICEBERG_GLUE; GlueCatalog glueCatalog = new GlueCatalog(); glueCatalog.setConf(getConfiguration()); @@ -63,8 +61,6 @@ protected void initLocalObjectsImpl() { @Override protected List listDatabaseNames() { - return nsCatalog.listNamespaces().stream() - .map(Namespace::toString) - .collect(Collectors.toList()); + return metadataOps.listDatabaseNames(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java index 0300477361bf46..875633da2933e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHMSExternalCatalog.java @@ -17,10 +17,16 @@ package org.apache.doris.datasource.iceberg; +import org.apache.doris.common.Config; +import org.apache.doris.common.security.authentication.AuthenticationConfig; +import org.apache.doris.common.security.authentication.HadoopUGI; import org.apache.doris.datasource.CatalogProperty; +import org.apache.doris.datasource.hive.HMSCachedClient; +import org.apache.doris.datasource.hive.HiveMetadataOps; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.HMSProperties; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.hive.HiveCatalog; @@ -37,15 +43,26 @@ public IcebergHMSExternalCatalog(long catalogId, String name, String resource, M } @Override - protected void initLocalObjectsImpl() { + protected void initCatalog() { icebergCatalogType = ICEBERG_HMS; HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); hiveCatalog.setConf(getConfiguration()); // initialize hive catalog Map catalogProperties = new HashMap<>(); String metastoreUris = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, ""); - catalogProperties.put(CatalogProperties.URI, metastoreUris); + HiveConf hiveConf = new HiveConf(); + for (Map.Entry kv : catalogProperty.getHadoopProperties().entrySet()) { + hiveConf.set(kv.getKey(), kv.getValue()); + } + hiveConf.set(HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(), + String.valueOf(Config.hive_metastore_client_timeout_second)); + HadoopUGI.tryKrbLogin(this.getName(), AuthenticationConfig.getKerberosConfig(hiveConf, + AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL, + AuthenticationConfig.HADOOP_KERBEROS_KEYTAB)); + HMSCachedClient cachedClient = HiveMetadataOps.createCachedClient(hiveConf, 1, null); + String location = cachedClient.getCatalogLocation("hive"); + catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, location); hiveCatalog.initialize(icebergCatalogType, catalogProperties); catalog = hiveCatalog; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java index 3101ad78c021f7..83bc70ce348966 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergHadoopExternalCatalog.java @@ -52,7 +52,7 @@ public IcebergHadoopExternalCatalog(long catalogId, String name, String resource } @Override - protected void initLocalObjectsImpl() { + protected void initCatalog() { icebergCatalogType = ICEBERG_HADOOP; HadoopCatalog hadoopCatalog = new HadoopCatalog(); Configuration conf = getConfiguration(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java index aebc385f50f670..10fa205b095133 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java @@ -102,14 +102,6 @@ public Table getIcebergTable(CatalogIf catalog, String dbName, String tbName) { return icebergTable; } - private Table getIcebergTable(Catalog catalog, long catalogId, String dbName, String tbName, - Map props) { - Table table = HiveMetaStoreClientHelper.ugiDoAs(catalogId, - () -> catalog.loadTable(TableIdentifier.of(dbName, tbName))); - initIcebergTableFileIO(table, props); - return table; - } - public void invalidateCatalogCache(long catalogId) { snapshotListCache.asMap().keySet().stream() .filter(key -> key.catalogId == catalogId) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java new file mode 100644 index 00000000000000..85ca2c957c0ee8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -0,0 +1,148 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg; + +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.DropDbStmt; +import org.apache.doris.analysis.DropTableStmt; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.StructField; +import org.apache.doris.catalog.StructType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeNameFormat; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.DorisTypeVisitor; +import org.apache.doris.datasource.ExternalDatabase; +import org.apache.doris.datasource.operations.ExternalMetadataOps; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class IcebergMetadataOps implements ExternalMetadataOps { + + private static final Logger LOG = LogManager.getLogger(IcebergMetadataOps.class); + protected Catalog catalog; + protected IcebergExternalCatalog dorisCatalog; + protected SupportsNamespaces nsCatalog; + + public IcebergMetadataOps(IcebergExternalCatalog dorisCatalog, Catalog catalog) { + this.dorisCatalog = dorisCatalog; + this.catalog = catalog; + nsCatalog = (SupportsNamespaces) catalog; + } + + public Catalog getCatalog() { + return catalog; + } + + @Override + public boolean tableExist(String dbName, String tblName) { + return catalog.tableExists(TableIdentifier.of(dbName, tblName)); + } + + public List listDatabaseNames() { + return nsCatalog.listNamespaces().stream() + .map(e -> { + String dbName = e.toString(); + try { + FeNameFormat.checkDbName(dbName); + } catch (AnalysisException ex) { + Util.logAndThrowRuntimeException(LOG, + String.format("Not a supported namespace name format: %s", dbName), ex); + } + return dbName; + }) + .collect(Collectors.toList()); + } + + @Override + public List listTableNames(String dbName) { + List tableIdentifiers = catalog.listTables(Namespace.of(dbName)); + return tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList()); + } + + @Override + public void createDb(CreateDbStmt stmt) throws DdlException { + SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog; + String dbName = stmt.getFullDbName(); + Map properties = stmt.getProperties(); + nsCatalog.createNamespace(Namespace.of(dbName), properties); + dorisCatalog.onRefresh(true); + } + + @Override + public void dropDb(DropDbStmt stmt) throws DdlException { + SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog; + String dbName = stmt.getDbName(); + if (dorisCatalog.getDbNameToId().containsKey(dbName)) { + Long aLong = dorisCatalog.getDbNameToId().get(dbName); + dorisCatalog.getIdToDb().remove(aLong); + dorisCatalog.getDbNameToId().remove(dbName); + } + nsCatalog.dropNamespace(Namespace.of(dbName)); + dorisCatalog.onRefresh(true); + } + + @Override + public void createTable(CreateTableStmt stmt) throws UserException { + String dbName = stmt.getDbName(); + ExternalDatabase db = dorisCatalog.getDbNullable(dbName); + if (db == null) { + throw new UserException("Failed to get database: '" + dbName + "' in catalog: " + dorisCatalog.getName()); + } + String tableName = stmt.getTableName(); + List columns = stmt.getColumns(); + List collect = columns.stream() + .map(col -> new StructField(col.getName(), col.getType(), col.getComment(), col.isAllowNull())) + .collect(Collectors.toList()); + StructType structType = new StructType(new ArrayList<>(collect)); + org.apache.iceberg.types.Type visit = + DorisTypeVisitor.visit(structType, new DorisTypeToIcebergType(structType)); + Schema schema = new Schema(visit.asNestedType().asStructType().fields()); + Map properties = stmt.getProperties(); + PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(properties, schema); + catalog.createTable(TableIdentifier.of(dbName, tableName), schema, partitionSpec, properties); + db.setUnInitialized(true); + } + + @Override + public void dropTable(DropTableStmt stmt) throws DdlException { + String dbName = stmt.getDbName(); + ExternalDatabase db = dorisCatalog.getDbNullable(dbName); + if (db == null) { + throw new DdlException("Failed to get database: '" + dbName + "' in catalog: " + dorisCatalog.getName()); + } + String tableName = stmt.getTableName(); + catalog.dropTable(TableIdentifier.of(dbName, tableName)); + db.setUnInitialized(true); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java index aefdfb65ceaadb..e839b9a00177c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java @@ -42,7 +42,7 @@ public IcebergRestExternalCatalog(long catalogId, String name, String resource, } @Override - protected void initLocalObjectsImpl() { + protected void initCatalog() { icebergCatalogType = ICEBERG_REST; Configuration conf = replaceS3Properties(getConfiguration()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index 1102527fa3a54e..089da30f7c430f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -17,7 +17,6 @@ package org.apache.doris.datasource.iceberg; - import org.apache.doris.analysis.BinaryPredicate; import org.apache.doris.analysis.BoolLiteral; import org.apache.doris.analysis.CastExpr; @@ -38,12 +37,14 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; import org.apache.doris.thrift.TExprOpcode; import com.google.common.collect.Lists; +import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; @@ -57,13 +58,22 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Iceberg utils */ public class IcebergUtils { private static final Logger LOG = LogManager.getLogger(IcebergUtils.class); - private static long MILLIS_TO_NANO_TIME = 1000; + private static ThreadLocal columnIdThreadLocal = new ThreadLocal() { + @Override + public Integer initialValue() { + return 0; + } + }; + static long MILLIS_TO_NANO_TIME = 1000; + private static final Pattern PARTITION_REG = Pattern.compile("(\\w+)\\((\\d+)?,?(\\w+)\\)"); // https://iceberg.apache.org/spec/#schemas-and-data-types // All time and timestamp values are stored with microsecond precision private static final int ICEBERG_DATETIME_SCALE_MS = 6; @@ -252,6 +262,59 @@ private static SlotRef convertDorisExprToSlotRef(Expr expr) { return slotRef; } + // "partition"="c1;day(c1);bucket(4,c3)" + public static PartitionSpec solveIcebergPartitionSpec(Map properties, Schema schema) + throws UserException { + if (properties.containsKey("partition")) { + PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); + String par = properties.get("partition").replaceAll(" ", ""); + String[] pars = par.split(";"); + for (String func : pars) { + if (func.contains("(")) { + Matcher matcher = PARTITION_REG.matcher(func); + if (matcher.matches()) { + switch (matcher.group(1).toLowerCase()) { + case "bucket": + builder.bucket(matcher.group(3), Integer.parseInt(matcher.group(2))); + break; + case "year": + case "years": + builder.year(matcher.group(3)); + break; + case "month": + case "months": + builder.month(matcher.group(3)); + break; + case "date": + case "day": + case "days": + builder.day(matcher.group(3)); + break; + case "date_hour": + case "hour": + case "hours": + builder.hour(matcher.group(3)); + break; + case "truncate": + builder.truncate(matcher.group(3), Integer.parseInt(matcher.group(2))); + break; + default: + throw new UserException("unsupported partition for " + matcher.group(1)); + } + } else { + throw new UserException("failed to get partition info from " + func); + } + } else { + builder.identity(func); + } + } + properties.remove("partition"); + return builder.build(); + } else { + return PartitionSpec.unpartitioned(); + } + } + private static Type icebergPrimitiveTypeToDorisType(org.apache.iceberg.types.Type.PrimitiveType primitive) { switch (primitive.typeId()) { case BOOLEAN: @@ -301,15 +364,19 @@ public static Type icebergTypeToDorisType(org.apache.iceberg.types.Type type) { } } + public static org.apache.iceberg.Table getIcebergTable(ExternalCatalog catalog, String dbName, String tblName) { + return Env.getCurrentEnv() + .getExtMetaCacheMgr() + .getIcebergMetadataCache() + .getIcebergTable(catalog, dbName, tblName); + } + /** * Get iceberg schema from catalog and convert them to doris schema */ public static List getSchema(ExternalCatalog catalog, String dbName, String name) { return HiveMetaStoreClientHelper.ugiDoAs(catalog.getConfiguration(), () -> { - org.apache.iceberg.Table icebergTable = Env.getCurrentEnv() - .getExtMetaCacheMgr() - .getIcebergMetadataCache() - .getIcebergTable(catalog, dbName, name); + org.apache.iceberg.Table icebergTable = getIcebergTable(catalog, dbName, name); Schema schema = icebergTable.schema(); List columns = schema.columns(); List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalInfoSchemaDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalInfoSchemaDatabase.java index 843083076e8f9a..d22b57dc907b1f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalInfoSchemaDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalInfoSchemaDatabase.java @@ -44,7 +44,7 @@ public static List listTableNames() { } @Override - protected ExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) { + protected ExternalTable newExternalTable(String tableName, long tblId, ExternalCatalog catalog) { return new ExternalInfoSchemaTable(tblId, tableName, catalog); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalDatabase.java index 29d95038c2ee5c..0bd39f8e3ed243 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalDatabase.java @@ -35,7 +35,7 @@ public JdbcExternalDatabase(ExternalCatalog extCatalog, long id, String name) { } @Override - protected JdbcExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) { + protected JdbcExternalTable newExternalTable(String tableName, long tblId, ExternalCatalog catalog) { return new JdbcExternalTable(tblId, tableName, name, (JdbcExternalCatalog) extCatalog); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalDatabase.java index 98bb28aecbf080..750f1ecf55518e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalDatabase.java @@ -37,7 +37,7 @@ public MaxComputeExternalDatabase(ExternalCatalog extCatalog, long id, String na } @Override - protected MaxComputeExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) { + protected MaxComputeExternalTable newExternalTable(String tableName, long tblId, ExternalCatalog catalog) { return new MaxComputeExternalTable(tblId, tableName, name, (MaxComputeExternalCatalog) extCatalog); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java new file mode 100644 index 00000000000000..4a2757f918f294 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.operations; + +import org.apache.doris.datasource.hive.HMSExternalCatalog; +import org.apache.doris.datasource.hive.HiveMetadataOps; +import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.datasource.iceberg.IcebergMetadataOps; +import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.catalog.Catalog; + + +public class ExternalMetadataOperations { + + public static HiveMetadataOps newHiveMetadataOps(HiveConf hiveConf, JdbcClientConfig jdbcClientConfig, + HMSExternalCatalog catalog) { + return new HiveMetadataOps(hiveConf, jdbcClientConfig, catalog); + } + + public static IcebergMetadataOps newIcebergMetadataOps(IcebergExternalCatalog dorisCatalog, Catalog catalog) { + return new IcebergMetadataOps(dorisCatalog, catalog); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java new file mode 100644 index 00000000000000..2d9498d0b91885 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.operations; + +import org.apache.doris.analysis.CreateDbStmt; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.DropDbStmt; +import org.apache.doris.analysis.DropTableStmt; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; + +import java.util.List; + +/** + * all external metadata operations use this interface + */ +public interface ExternalMetadataOps { + + /** + * create db in external metastore + * @param stmt + * @throws DdlException + */ + void createDb(CreateDbStmt stmt) throws DdlException; + + /** + * drop db in external metastore + * @param stmt + * @throws DdlException + */ + void dropDb(DropDbStmt stmt) throws DdlException; + + /** + * + * @param stmt + * @throws UserException + */ + void createTable(CreateTableStmt stmt) throws UserException; + + /** + * + * @param stmt + * @throws DdlException + */ + void dropTable(DropTableStmt stmt) throws DdlException; + + /** + * + * @return + */ + List listDatabaseNames(); + + /** + * + * @param db + * @return + */ + List listTableNames(String db); + + /** + * + * @param dbName + * @param tblName + * @return + */ + boolean tableExist(String dbName, String tblName); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalDatabase.java index b6d39da573df48..fc0b614920f949 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalDatabase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalDatabase.java @@ -21,42 +21,14 @@ import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.InitDatabaseLog; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - public class PaimonExternalDatabase extends ExternalDatabase { - private static final Logger LOG = LogManager.getLogger(PaimonExternalDatabase.class); - public PaimonExternalDatabase(ExternalCatalog extCatalog, Long id, String name) { super(extCatalog, id, name, InitDatabaseLog.Type.PAIMON); } @Override - protected PaimonExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) { + protected PaimonExternalTable newExternalTable(String tableName, long tblId, ExternalCatalog catalog) { return new PaimonExternalTable(tblId, tableName, name, (PaimonExternalCatalog) extCatalog); } - - @Override - public void dropTable(String tableName) { - if (LOG.isDebugEnabled()) { - LOG.debug("drop table [{}]", tableName); - } - Long tableId = tableNameToId.remove(tableName); - if (tableId == null) { - LOG.warn("drop table [{}] failed", tableName); - } - idToTbl.remove(tableId); - } - - @Override - public void createTable(String tableName, long tableId) { - if (LOG.isDebugEnabled()) { - LOG.debug("create table [{}]", tableName); - } - tableNameToId.put(tableName, tableId); - PaimonExternalTable table = new PaimonExternalTable(tableId, tableName, name, - (PaimonExternalCatalog) extCatalog); - idToTbl.put(tableId, table); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java index f9b315cc5c09d6..33859dc5f30627 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java @@ -264,7 +264,7 @@ private static void setS3FsAccess(Map s3Properties, Map mysqlProp = Maps.newHashMap(); @@ -279,7 +279,7 @@ public static Database mockDb() throws UserException { } catch (DdlException e) { e.printStackTrace(); } - db.createTable(mysqlTable); + db.registerTable(mysqlTable); // 3. range partition olap table MaterializedIndex baseIndexP1 = new MaterializedIndex(TEST_TBL2_ID, IndexState.NORMAL); @@ -387,7 +387,7 @@ public static Database mockDb() throws UserException { olapTable2.setIndexMeta(TEST_ROLLUP_ID, TEST_ROLLUP_NAME, TEST_ROLLUP_SCHEMA, 0, ROLLUP_SCHEMA_HASH, (short) 1, TStorageType.COLUMN, KeysType.AGG_KEYS); - db.createTable(olapTable2); + db.registerTable(olapTable2); return db; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java index d361777fdd56c3..ff60a6e8b9063b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java @@ -244,7 +244,7 @@ boolean await(long timeout, TimeUnit unit) { } // drop this table, cause we want to try restoring this table - db.dropTable(expectedRestoreTbl.getName()); + db.unregisterTable(expectedRestoreTbl.getName()); job = new RestoreJob(label, "2018-01-01 01:01:01", db.getId(), db.getFullName(), jobInfo, false, new ReplicaAllocation((short) 3), 100000, -1, false, false, false, env, repo.getId()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java index 9d35bbcd7a2698..93b7d95dbfc645 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java @@ -230,7 +230,7 @@ public static Database createSimpleDb(long dbId, long tableId, long partitionId, table.setBaseIndexId(indexId); // db Database db = new Database(dbId, testDb1); - db.createTable(table); + db.registerTable(table); // add a es table to catalog try { @@ -288,7 +288,7 @@ public static void createDupTable(Database db) { TStorageType.COLUMN, KeysType.DUP_KEYS); table.setBaseIndexId(testIndexId2); // db - db.createTable(table); + db.registerTable(table); } public static void createEsTable(Database db) throws DdlException { @@ -319,7 +319,7 @@ public static void createEsTable(Database db) throws DdlException { properties.put(EsResource.KEYWORD_SNIFF, "true"); EsTable esTable = new EsTable(testEsTableId1, testEsTable1, columns, properties, partitionInfo); - db.createTable(esTable); + db.registerTable(esTable); } public static Backend createBackend(long id, String host, int heartPort, int bePort, int httpPort) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DatabaseTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DatabaseTest.java index 3f8e2eb08ae803..07ce2f9229b8de 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DatabaseTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DatabaseTest.java @@ -122,8 +122,8 @@ public void getTablesOnIdOrderOrThrowExceptionTest() throws MetaNotFoundExceptio List baseSchema2 = new LinkedList<>(); OlapTable table2 = new OlapTable(2001L, "baseTable2", baseSchema2, KeysType.DUP_KEYS, new SinglePartitionInfo(), new RandomDistributionInfo(10)); - db.createTable(table1); - db.createTable(table2); + db.registerTable(table1); + db.registerTable(table2); List tableIdList = Lists.newArrayList(2001L, 2000L); List tableList = db.getTablesOnIdOrderOrThrowException(tableIdList); Assert.assertEquals(2, tableList.size()); @@ -138,7 +138,7 @@ public void getTableOrThrowExceptionTest() throws MetaNotFoundException { List baseSchema = new LinkedList<>(); OlapTable table = new OlapTable(2000L, "baseTable", baseSchema, KeysType.AGG_KEYS, new SinglePartitionInfo(), new RandomDistributionInfo(10)); - db.createTable(table); + db.registerTable(table); Table resultTable1 = db.getTableOrMetaException(2000L, Table.TableType.OLAP); Table resultTable2 = db.getTableOrMetaException("baseTable", Table.TableType.OLAP); Assert.assertEquals(table, resultTable1); @@ -168,9 +168,9 @@ public void createAndDropPartitionTest() { table.addPartition(partition); // create - Assert.assertTrue(db.createTable(table)); + Assert.assertTrue(db.registerTable(table)); // duplicate - Assert.assertFalse(db.createTable(table)); + Assert.assertFalse(db.registerTable(table)); Assert.assertEquals(table, db.getTableNullable(table.getId())); Assert.assertEquals(table, db.getTableNullable(table.getName())); @@ -185,11 +185,11 @@ public void createAndDropPartitionTest() { // drop // drop not exist tableFamily - db.dropTable("invalid"); + db.unregisterTable("invalid"); Assert.assertEquals(1, db.getTables().size()); - db.createTable(table); - db.dropTable(table.getName()); + db.registerTable(table); + db.unregisterTable(table.getName()); Assert.assertEquals(0, db.getTables().size()); } @@ -234,7 +234,7 @@ public void testSerialization() throws Exception { table.setIndexMeta(1L, "test", column, 1, 1, shortKeyColumnCount, TStorageType.COLUMN, KeysType.AGG_KEYS); Deencapsulation.setField(table, "baseIndexId", 1); table.addPartition(partition); - db2.createTable(table); + db2.registerTable(table); db2.write(dos); dos.flush(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java index 9abb1322e6dc54..7b1d595c45f166 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/InfoSchemaDbTest.java @@ -29,9 +29,9 @@ public class InfoSchemaDbTest { public void testNormal() throws IOException, DdlException { Database db = new InfoSchemaDb(); - Assert.assertFalse(db.createTable(null)); + Assert.assertFalse(db.registerTable(null)); Assert.assertFalse(db.createTableWithLock(null, false, false).first); - db.dropTable("authors"); + db.unregisterTable("authors"); Assert.assertThrows(IOException.class, () -> db.write(null)); Assert.assertNull(db.getTableNullable("authors")); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/MysqlDbTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/MysqlDbTest.java index 788780202b44d8..6e17867b4d3b2b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/MysqlDbTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/MysqlDbTest.java @@ -29,9 +29,9 @@ public class MysqlDbTest { public void testNormal() throws IOException, DdlException { Database db = new MysqlDb(); - Assert.assertFalse(db.createTable(null)); + Assert.assertFalse(db.registerTable(null)); Assert.assertFalse(db.createTableWithLock(null, false, false).first); - db.dropTable("authors"); + db.unregisterTable("authors"); Assert.assertThrows(IOException.class, () -> db.write(null)); Assert.assertNull(db.getTableNullable("authors")); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java index 1115a478b15a80..62ba34cf4e3d47 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/DiskRebalanceTest.java @@ -171,7 +171,7 @@ public void testDiskRebalancerWithSameUsageDisk() { olapTable = new OlapTable(2, "fake table", new ArrayList<>(), KeysType.DUP_KEYS, new RangePartitionInfo(), new HashDistributionInfo()); - db.createTable(olapTable); + db.registerTable(olapTable); // 1 table, 3 partitions p0,p1,p2 MaterializedIndex materializedIndex = new MaterializedIndex(olapTable.getId(), null); @@ -214,7 +214,7 @@ public void testDiskRebalancerWithDiffUsageDisk() { olapTable = new OlapTable(2, "fake table", new ArrayList<>(), KeysType.DUP_KEYS, new RangePartitionInfo(), new HashDistributionInfo()); - db.createTable(olapTable); + db.registerTable(olapTable); // 1 table, 3 partitions p0,p1,p2 MaterializedIndex materializedIndex = new MaterializedIndex(olapTable.getId(), null); diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java index 613510c0f58b8b..52ccb90a12c778 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalanceTest.java @@ -152,7 +152,7 @@ long ignored() { olapTable = new OlapTable(2, "fake table", new ArrayList<>(), KeysType.DUP_KEYS, new RangePartitionInfo(), new HashDistributionInfo()); - db.createTable(olapTable); + db.registerTable(olapTable); // 1 table, 3 partitions p0,p1,p2 MaterializedIndex materializedIndex = new MaterializedIndex(olapTable.getId(), null); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java index b3ae0afe8bf698..b16879625cdea9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java @@ -123,7 +123,7 @@ public static Database createDb(long dbId, long tableId, long partitionId, long // db Database db = new Database(dbId, DB_NAME); - db.createTable(table); + db.registerTable(table); return db; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java index d801f55fc7cfa3..5144ddb5a1a5c4 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java +++ b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java @@ -216,11 +216,11 @@ private static Env newDelegateCatalog() { //EasyMock.expect(catalog.getAuth()).andReturn(paloAuth).anyTimes(); Database db = new Database(testDbId, "testDb"); OlapTable table = newTable(TABLE_NAME); - db.createTable(table); + db.registerTable(table); OlapTable table1 = newTable(TABLE_NAME + 1); - db.createTable(table1); + db.registerTable(table1); EsTable esTable = newEsTable("es_table"); - db.createTable(esTable); + db.registerTable(esTable); InternalCatalog internalCatalog = Deencapsulation.newInstance(InternalCatalog.class); new Expectations(internalCatalog) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java index 182596c3b63d1c..359775a8009646 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java @@ -268,22 +268,22 @@ Env getCurrentEnv() { db = ((InternalCatalog) env.getCurrentCatalog()).getDbNullable(fullDbName); // table and view init use analyzer, should init after analyzer build OlapTable tbl1 = createOrderTable(); - db.createTable(tbl1); + db.registerTable(tbl1); OlapTable tbl2 = createProfileTable(); - db.createTable(tbl2); + db.registerTable(tbl2); OlapTable tbl3 = createEventTable(); - db.createTable(tbl3); + db.registerTable(tbl3); // build view meta inline sql and create view directly, the originStmt from inline sql // should be analyzed by create view statement analyzer and then to sql View view1 = createEventView1(); - db.createTable(view1); + db.registerTable(view1); View view2 = createEventView2(); - db.createTable(view2); + db.registerTable(view2); View view3 = createEventView3(); - db.createTable(view3); + db.registerTable(view3); View view4 = createEventNestedView(); - db.createTable(view4); + db.registerTable(view4); } private OlapTable createOrderTable() {