From 6142a539f480a1fd8d8ca89f76e372a52f4d2d22 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Fri, 24 Nov 2023 23:18:07 +0800 Subject: [PATCH] [deprecated](external) remove deprecated hudi and iceberg external table (#27456) The creation of hudi and iceberg table is disallowed since v1.2. All these features are covered by hudi/iceberg catalog. We should remove the code in v2.1 The PR mainly changes: 1. remove the code of hudi/iceberg external table. 2. remove code of iceberg database. 3. disallowed hive external table's creation. 4. disabled odbc,mysql,broker external table by default, and add FE config `disable_odbc_mysql_broker_table` to control it --- docs/en/docs/admin-manual/config/fe-config.md | 49 +-- .../Create/CREATE-DATABASE.md | 22 +- .../Create/CREATE-EXTERNAL-TABLE.md | 274 ----------------- docs/sidebars.json | 3 +- .../docs/admin-manual/config/fe-config.md | 49 +-- .../Create/CREATE-DATABASE.md | 24 +- .../Create/CREATE-EXTERNAL-TABLE.md | 272 ----------------- .../java/org/apache/doris/common/Config.java | 32 +- fe/fe-core/src/main/cup/sql_parser.cup | 5 - .../doris/analysis/CreateResourceStmt.java | 4 +- .../doris/analysis/CreateTableStmt.java | 40 +-- .../doris/catalog/DatabaseProperty.java | 48 --- .../java/org/apache/doris/catalog/Env.java | 18 -- .../catalog/HiveMetaStoreClientHelper.java | 130 -------- .../apache/doris/catalog/IcebergTable.java | 266 ----------------- .../apache/doris/catalog/RefreshManager.java | 71 +---- .../java/org/apache/doris/catalog/Table.java | 5 - .../doris/datasource/InternalCatalog.java | 22 +- .../doris/external/ExternalScanRange.java | 26 -- .../apache/doris/external/hudi/HudiTable.java | 110 ------- .../iceberg/DorisIcebergException.java | 32 -- .../doris/external/iceberg/HiveCatalog.java | 91 ------ .../external/iceberg/IcebergCatalog.java | 67 ----- .../external/iceberg/IcebergCatalogMgr.java | 220 -------------- .../iceberg/IcebergTableCreationRecord.java | 88 ------ .../IcebergTableCreationRecordMgr.java | 279 ------------------ .../external/iceberg/util/IcebergUtils.java | 204 ------------- .../java/org/apache/doris/fs/FileSystem.java | 9 +- .../org/apache/doris/fs/LocalFileSystem.java | 5 + .../apache/doris/fs/remote/RemoteFile.java | 4 - .../org/apache/doris/qe/ShowExecutor.java | 14 - .../org/apache/doris/alter/AlterTest.java | 3 +- .../analysis/CreateResourceStmtTest.java | 2 +- .../doris/analysis/CreateTableStmtTest.java | 56 ---- .../doris/catalog/CreateTableLikeTest.java | 2 + .../doris/catalog/EnvOperationTest.java | 2 +- .../catalog/OdbcCatalogResourceTest.java | 2 +- .../apache/doris/planner/QueryPlanTest.java | 1 + .../doris/planner/StatisticDeriveTest.java | 2 + .../doris/utframe/TestWithFeService.java | 2 +- .../test_push_conjunct_external_table.groovy | 1 + 41 files changed, 48 insertions(+), 2508 deletions(-) delete mode 100644 docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-EXTERNAL-TABLE.md delete mode 100644 docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-EXTERNAL-TABLE.md delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/ExternalScanRange.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/iceberg/DorisIcebergException.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/iceberg/HiveCatalog.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalog.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecord.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecordMgr.java diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md index 60a8bfb5ec89d3..772dc25b64a1b5 100644 --- a/docs/en/docs/admin-manual/config/fe-config.md +++ b/docs/en/docs/admin-manual/config/fe-config.md @@ -2296,60 +2296,15 @@ MasterOnly:false multi catalog concurrent file scan size -#### `enable_odbc_table` +#### `enable_odbc_mysql_broker_table` Default:false IsMutable:true -MasterOnly:true - -Whether to enable the ODBC table, it is not enabled by default. You need to manually configure it when you use it. - -This parameter can be set by: ADMIN SET FRONTEND CONFIG("key"="value") - -**Note:** This parameter has been deleted in version 1.2. The ODBC External Table is enabled by default, and the ODBC External Table will be deleted in a later version. It is recommended to use the JDBC External Table - -#### `disable_iceberg_hudi_table` - -Default:true - -IsMutable:true - -MasterOnly:false - -Starting from version 1.2, we no longer support create hudi and iceberg External Table. Please use the multi catalog. - -#### `iceberg_table_creation_interval_second` - -Default:10 (s) - -IsMutable:true - MasterOnly:false -fe will create iceberg table every iceberg_table_creation_interval_second - -#### `iceberg_table_creation_strict_mode` - -Default:true - -IsMutable:true - -MasterOnly:true - -If set to TRUE, the column definitions of iceberg table and the doris table must be consistent -If set to FALSE, Doris only creates columns of supported data types. - -#### `max_iceberg_table_creation_record_size` - -Default max number of recent iceberg database table creation record that can be stored in memory. - -Default:2000 - -IsMutable:true - -MasterOnly:true +Starting from version 2.1, we no longer support create odbc, jdbc and broker external table. For odbc and mysql external table, use jdbc table or jdbc catalog instead. For broker table, use table valued function instead. #### `max_hive_partition_cache_num` diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-DATABASE.md b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-DATABASE.md index 0305b197db851c..49b7cf0537b9bc 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-DATABASE.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-DATABASE.md @@ -43,22 +43,6 @@ CREATE DATABASE [IF NOT EXISTS] db_name `PROPERTIES` Additional information about the database, which can be defaulted. -- If you create an Iceberg database, you need to provide the following information in properties: - - ```sql - PROPERTIES ( - "iceberg.database" = "iceberg_db_name", - "iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083", - "iceberg.catalog.type" = "HIVE_CATALOG" - ) - ```` - - illustrate: - - - `ceberg.database` : the library name corresponding to Iceberg; - - `iceberg.hive.metastore.uris` : hive metastore service address; - - `iceberg.catalog.type`: The default is `HIVE_CATALOG`; currently only `HIVE_CATALOG` is supported, and more Iceberg catalog types will be supported in the future. - - If you want to specify the default replica distribution for tables in db, you need to specify `replication_allocation` (the `replication_allocation` attribute of table will have higher priority than db) ```sql @@ -75,14 +59,12 @@ CREATE DATABASE [IF NOT EXISTS] db_name CREATE DATABASE db_test; ```` -2. Create a new Iceberg database iceberg_test +2. Create a new database with default replica distribution: ```sql CREATE DATABASE `iceberg_test` PROPERTIES ( - "iceberg.database" = "doris", - "iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083", - "iceberg.catalog.type" = "HIVE_CATALOG" + "replication_allocation" = "tag.location.group_1:3" ); ```` diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-EXTERNAL-TABLE.md b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-EXTERNAL-TABLE.md deleted file mode 100644 index 80417979639d39..00000000000000 --- a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-EXTERNAL-TABLE.md +++ /dev/null @@ -1,274 +0,0 @@ ---- -{ - "title": "CREATE-EXTERNAL-TABLE", - "language": "en" -} ---- - - - -## CREATE-EXTERNAL-TABLE - -### Name - -CREATE EXTERNAL TABLE - -### Description - -This statement is used to create an external table, see [CREATE TABLE](./CREATE-TABLE.md) for the specific syntax. - -Which type of external table is mainly identified by the ENGINE type, currently MYSQL, BROKER, HIVE, ICEBERG, HUDI are optional - -1. If it is mysql, you need to provide the following information in properties: - - ```sql - PROPERTIES ( - "host" = "mysql_server_host", - "port" = "mysql_server_port", - "user" = "your_user_name", - "password" = "your_password", - "database" = "database_name", - "table" = "table_name" - ) - ```` - and there is an optional propertiy "charset" which can set character fom mysql connection, default value is "utf8". You can set another value "utf8mb4" instead of "utf8" when you need. - - Notice: - - - "table_name" in "table" entry is the real table name in mysql. The table_name in the CREATE TABLE statement is the name of the mysql table in Doris, which can be different. - - - The purpose of creating a mysql table in Doris is to access the mysql database through Doris. Doris itself does not maintain or store any mysql data. - -2. If it is a broker, it means that the access to the table needs to pass through the specified broker, and the following information needs to be provided in properties: - - ```sql - PROPERTIES ( - "broker_name" = "broker_name", - "path" = "file_path1[,file_path2]", - "column_separator" = "value_separator" - "line_delimiter" = "value_delimiter" - ) - ```` - - In addition, you need to provide the Property information required by the Broker, and pass it through the BROKER PROPERTIES, for example, HDFS needs to pass in - - ```sql - BROKER PROPERTIES( - "username" = "name", - "password" = "password" - ) - ```` - - According to different Broker types, the content that needs to be passed in is also different. - - Notice: - - - If there are multiple files in "path", separate them with comma [,]. If the filename contains a comma, use %2c instead. If the filename contains %, use %25 instead - - Now the file content format supports CSV, and supports GZ, BZ2, LZ4, LZO (LZOP) compression formats. - -3. If it is hive, you need to provide the following information in properties: - - ```sql - PROPERTIES ( - "database" = "hive_db_name", - "table" = "hive_table_name", - "hive.metastore.uris" = "thrift://127.0.0.1:9083" - ) - ```` - - Where database is the name of the library corresponding to the hive table, table is the name of the hive table, and hive.metastore.uris is the address of the hive metastore service. - -4. In case of iceberg, you need to provide the following information in properties: - - ```sql - PROPERTIES ( - "iceberg.database" = "iceberg_db_name", - "iceberg.table" = "iceberg_table_name", - "iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083", - "iceberg.catalog.type" = "HIVE_CATALOG" - ) - ```` - - Where database is the library name corresponding to Iceberg; - table is the corresponding table name in Iceberg; - hive.metastore.uris is the hive metastore service address; - catalog.type defaults to HIVE_CATALOG. Currently only HIVE_CATALOG is supported, more Iceberg catalog types will be supported in the future. - -5. In case of hudi, you need to provide the following information in properties: - - ```sql - PROPERTIES ( - "hudi.database" = "hudi_db_in_hive_metastore", - "hudi.table" = "hudi_table_in_hive_metastore", - "hudi.hive.metastore.uris" = "thrift://127.0.0.1:9083" - ) - ```` - - Where hudi.database is the corresponding database name in HiveMetaStore; - hudi.table is the corresponding table name in HiveMetaStore; - hive.metastore.uris is the hive metastore service address; - -### Example - -1. Create a MYSQL external table - - Create mysql table directly from outer table information - - ```sql - CREATE EXTERNAL TABLE example_db.table_mysql - ( - k1 DATE, - k2 INT, - k3 SMALLINT, - k4 VARCHAR(2048), - k5 DATETIME - ) - ENGINE=mysql - PROPERTIES - ( - "host" = "127.0.0.1", - "port" = "8239", - "user" = "mysql_user", - "password" = "mysql_passwd", - "database" = "mysql_db_test", - "table" = "mysql_table_test", - "charset" = "utf8mb4" - ) - ```` - - Create mysql table through External Catalog Resource - - ```sql - # Create Resource first - CREATE EXTERNAL RESOURCE "mysql_resource" - PROPERTIES - ( - "type" = "odbc_catalog", - "user" = "mysql_user", - "password" = "mysql_passwd", - "host" = "127.0.0.1", - "port" = "8239" - ); - - # Then create mysql external table through Resource - CREATE EXTERNAL TABLE example_db.table_mysql - ( - k1 DATE, - k2 INT, - k3 SMALLINT, - k4 VARCHAR(2048), - k5 DATETIME - ) - ENGINE=mysql - PROPERTIES - ( - "odbc_catalog_resource" = "mysql_resource", - "database" = "mysql_db_test", - "table" = "mysql_table_test" - ) - ```` - -2. Create a broker external table with data files stored on HDFS, the data is split with "|", and "\n" is newline - - ```sql - CREATE EXTERNAL TABLE example_db.table_broker ( - k1 DATE, - k2 INT, - k3 SMALLINT, - k4 VARCHAR(2048), - k5 DATETIME - ) - ENGINE=broker - PROPERTIES ( - "broker_name" = "hdfs", - "path" = "hdfs://hdfs_host:hdfs_port/data1,hdfs://hdfs_host:hdfs_port/data2,hdfs://hdfs_host:hdfs_port/data3%2c4", - "column_separator" = "|", - "line_delimiter" = "\n" - ) - BROKER PROPERTIES ( - "username" = "hdfs_user", - "password" = "hdfs_password" - ) - ```` - -3. Create a hive external table - - ```sql - CREATE TABLE example_db.table_hive - ( - k1 TINYINT, - k2 VARCHAR(50), - v INT - ) - ENGINE=hive - PROPERTIES - ( - "database" = "hive_db_name", - "table" = "hive_table_name", - "hive.metastore.uris" = "thrift://127.0.0.1:9083" - ); - ```` - -4. Create an Iceberg skin - - ```sql - CREATE TABLE example_db.t_iceberg - ENGINE=ICEBERG - PROPERTIES ( - "iceberg.database" = "iceberg_db", - "iceberg.table" = "iceberg_table", - "iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083", - "iceberg.catalog.type" = "HIVE_CATALOG" - ); - ```` - -5. Create an Hudi external table - - create hudi table without schema(recommend) - ```sql - CREATE TABLE example_db.t_hudi - ENGINE=HUDI - PROPERTIES ( - "hudi.database" = "hudi_db_in_hive_metastore", - "hudi.table" = "hudi_table_in_hive_metastore", - "hudi.hive.metastore.uris" = "thrift://127.0.0.1:9083" - ); - ```` - - create hudi table with schema - ```sql - CREATE TABLE example_db.t_hudi ( - `id` int NOT NULL COMMENT "id number", - `name` varchar(10) NOT NULL COMMENT "user name" - ) - ENGINE=HUDI - PROPERTIES ( - "hudi.database" = "hudi_db_in_hive_metastore", - "hudi.table" = "hudi_table_in_hive_metastore", - "hudi.hive.metastore.uris" = "thrift://127.0.0.1:9083" - ); - ```` - -### Keywords - - CREATE, EXTERNAL, TABLE - -### Best Practice - diff --git a/docs/sidebars.json b/docs/sidebars.json index 0c3855ec5940c1..2efc051f686569 100644 --- a/docs/sidebars.json +++ b/docs/sidebars.json @@ -909,7 +909,6 @@ "sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE", "sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE-LIKE", "sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE-AS-SELECT", - "sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-EXTERNAL-TABLE", "sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-INDEX", "sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-VIEW", "sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-MATERIALIZED-VIEW", @@ -1342,4 +1341,4 @@ ] } ] -} \ No newline at end of file +} diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md index e22b995496be6b..bf533b21bf621c 100644 --- a/docs/zh-CN/docs/admin-manual/config/fe-config.md +++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md @@ -2297,60 +2297,15 @@ multi catalog 并发文件扫描线程数 multi catalog 并发文件扫描大小 -#### `enable_odbc_table` +#### `enable_odbc_mysql_broker_table` 默认值:false 是否可以动态配置:true -是否为 Master FE 节点独有的配置项:true - -是否启用 ODBC 表,默认不启用,在使用的时候需要手动配置启用,该参数可以通过: - -`ADMIN SET FRONTEND CONFIG("key"="value") `方式进行设置 - -**注意:** 这个参数在1.2版本中已经删除,默认启用ODBC外表,并且会在以后的某个版本中删除ODBC外表,推荐使用JDBC外表 - -#### `disable_iceberg_hudi_table` - -默认值:true - -是否可以动态配置:true - -是否为 Master FE 节点独有的配置项:false - -从 1.2 版本开始,我们不再支持创建hudi和iceberg外表。请改用multi catalog功能。 - -#### `iceberg_table_creation_interval_second` - -默认值:10 (s) - -是否可以动态配置:true - 是否为 Master FE 节点独有的配置项:false -fe 将每隔 iceberg_table_creation_interval_second 创建iceberg table - -#### `iceberg_table_creation_strict_mode` - -默认值:true - -是否可以动态配置:true - -是否为 Master FE 节点独有的配置项:true - -如果设置为 true,iceberg 表和 Doris 表的列定义必须一致。 -如果设置为 false,Doris 只创建支持的数据类型的列。 - -#### `max_iceberg_table_creation_record_size` - -内存中可以存储的最近iceberg库表创建记录的默认最大数量 - -默认值:2000 - -是否可以动态配置:true - -是否为 Master FE 节点独有的配置项:true +从 2.1 版本开始,我们不再支持创建 odbc, mysql 和 broker外表。对于 odbc 外表,可以使用 jdbc 外表或者 jdbc catalog 替代。对于 broker 外表,可以使用 table valued function 替代。 #### `max_hive_partition_cache_num` diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-DATABASE.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-DATABASE.md index fafe4d5846fcc5..32977bdf957e2d 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-DATABASE.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-DATABASE.md @@ -43,22 +43,6 @@ CREATE DATABASE [IF NOT EXISTS] db_name `PROPERTIES` 该数据库的附加信息,可以缺省。 -- 如果创建 Iceberg 数据库,则需要在 properties 中提供以下信息: - - ```sql - PROPERTIES ( - "iceberg.database" = "iceberg_db_name", - "iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083", - "iceberg.catalog.type" = "HIVE_CATALOG" - ) - ``` - - 参数说明: - - - `ceberg.database` :Iceberg 对应的库名; - - `iceberg.hive.metastore.uris` :hive metastore 服务地址; - - `iceberg.catalog.type`: 默认为 `HIVE_CATALOG`;当前仅支持 `HIVE_CATALOG`,后续会支持更多 Iceberg catalog 类型。 - - 如果要为db下的table指定默认的副本分布策略,需要指定`replication_allocation`(table的`replication_allocation`属性优先级会高于db) ```sql @@ -75,14 +59,12 @@ CREATE DATABASE [IF NOT EXISTS] db_name CREATE DATABASE db_test; ``` -2. 新建 Iceberg 数据库 iceberg_test +2. 新建数据库并设置默认的副本分布: ```sql - CREATE DATABASE `iceberg_test` + CREATE DATABASE `db_test` PROPERTIES ( - "iceberg.database" = "doris", - "iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083", - "iceberg.catalog.type" = "HIVE_CATALOG" + "replication_allocation" = "tag.location.group_1:3" ); ``` diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-EXTERNAL-TABLE.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-EXTERNAL-TABLE.md deleted file mode 100644 index 3f32a452ac381f..00000000000000 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-EXTERNAL-TABLE.md +++ /dev/null @@ -1,272 +0,0 @@ ---- -{ - "title": "CREATE-EXTERNAL-TABLE", - "language": "zh-CN" -} ---- - - - -## CREATE-EXTERNAL-TABLE - -### Name - -CREATE EXTERNAL TABLE - -### Description - -此语句用来创建外部表,具体语法参阅 [CREATE TABLE](./CREATE-TABLE.md)。 - -主要通过 ENGINE 类型来标识是哪种类型的外部表,目前可选 MYSQL、BROKER、HIVE、ICEBERG -、HUDI -1. 如果是 mysql,则需要在 properties 提供以下信息: - - ```sql - PROPERTIES ( - "host" = "mysql_server_host", - "port" = "mysql_server_port", - "user" = "your_user_name", - "password" = "your_password", - "database" = "database_name", - "table" = "table_name" - ) - ``` - 以及一个可选属性"charset",可以用来设置mysql连接的字符集, 默认值是"utf8"。如有需要,你可以设置为另外一个字符集"utf8mb4"。 - - 注意: - - - "table" 条目中的 "table_name" 是 mysql 中的真实表名。而 CREATE TABLE 语句中的 table_name 是该 mysql 表在 Doris 中的名字,可以不同。 - - - 在 Doris 创建 mysql 表的目的是可以通过 Doris 访问 mysql 数据库。而 Doris 本身并不维护、存储任何 mysql 数据。 - -2. 如果是 broker,表示表的访问需要通过指定的broker, 需要在 properties 提供以下信息: - - ```sql - PROPERTIES ( - "broker_name" = "broker_name", - "path" = "file_path1[,file_path2]", - "column_separator" = "value_separator" - "line_delimiter" = "value_delimiter" - ) - ``` - - 另外还需要提供Broker需要的Property信息,通过BROKER PROPERTIES来传递,例如HDFS需要传入 - - ```sql - BROKER PROPERTIES( - "username" = "name", - "password" = "password" - ) - ``` - - 这个根据不同的Broker类型,需要传入的内容也不相同 - - 注意: - - - "path" 中如果有多个文件,用逗号[,]分割。如果文件名中包含逗号,那么使用 %2c 来替代。如果文件名中包含 %,使用 %25 代替 - - 现在文件内容格式支持CSV,支持GZ,BZ2,LZ4,LZO(LZOP) 压缩格式。 - -3. 如果是 hive,则需要在 properties 提供以下信息: - - ```sql - PROPERTIES ( - "database" = "hive_db_name", - "table" = "hive_table_name", - "hive.metastore.uris" = "thrift://127.0.0.1:9083" - ) - ``` - - 其中 database 是 hive 表对应的库名字,table 是 hive 表的名字,hive.metastore.uris 是 hive metastore 服务地址。 - -4. 如果是 iceberg,则需要在 properties 中提供以下信息: - - ```sql - PROPERTIES ( - "iceberg.database" = "iceberg_db_name", - "iceberg.table" = "iceberg_table_name", - "iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083", - "iceberg.catalog.type" = "HIVE_CATALOG" - ) - ``` - - 其中 database 是 Iceberg 对应的库名; - table 是 Iceberg 中对应的表名; - hive.metastore.uris 是 hive metastore 服务地址; - catalog.type 默认为 HIVE_CATALOG。当前仅支持 HIVE_CATALOG,后续会支持更多 Iceberg catalog 类型。 - -5. 如果是 hudi,则需要在 properties 中提供以下信息: - - ```sql - PROPERTIES ( - "hudi.database" = "hudi_db_in_hive_metastore", - "hudi.table" = "hudi_table_in_hive_metastore", - "hudi.hive.metastore.uris" = "thrift://127.0.0.1:9083" - ) - ```` - - 其中 hudi.database 是 hive 表对应的库名字,hudi.table 是 hive 表的名字,hive.metastore.uris 是 hive metastore 服务地址。 - -### Example - -1. 创建MYSQL外部表 - - 直接通过外表信息创建mysql表 - - ```sql - CREATE EXTERNAL TABLE example_db.table_mysql - ( - k1 DATE, - k2 INT, - k3 SMALLINT, - k4 VARCHAR(2048), - k5 DATETIME - ) - ENGINE=mysql - PROPERTIES - ( - "host" = "127.0.0.1", - "port" = "8239", - "user" = "mysql_user", - "password" = "mysql_passwd", - "database" = "mysql_db_test", - "table" = "mysql_table_test", - "charset" = "utf8mb4" - ) - ``` - - 通过External Catalog Resource创建mysql表 - - ```sql - # 先创建Resource - CREATE EXTERNAL RESOURCE "mysql_resource" - PROPERTIES - ( - "type" = "odbc_catalog", - "user" = "mysql_user", - "password" = "mysql_passwd", - "host" = "127.0.0.1", - "port" = "8239" - ); - - # 再通过Resource创建mysql外部表 - CREATE EXTERNAL TABLE example_db.table_mysql - ( - k1 DATE, - k2 INT, - k3 SMALLINT, - k4 VARCHAR(2048), - k5 DATETIME - ) - ENGINE=mysql - PROPERTIES - ( - "odbc_catalog_resource" = "mysql_resource", - "database" = "mysql_db_test", - "table" = "mysql_table_test" - ) - ``` - -2. 创建一个数据文件存储在HDFS上的 broker 外部表, 数据使用 "|" 分割,"\n" 换行 - - ```sql - CREATE EXTERNAL TABLE example_db.table_broker ( - k1 DATE, - k2 INT, - k3 SMALLINT, - k4 VARCHAR(2048), - k5 DATETIME - ) - ENGINE=broker - PROPERTIES ( - "broker_name" = "hdfs", - "path" = "hdfs://hdfs_host:hdfs_port/data1,hdfs://hdfs_host:hdfs_port/data2,hdfs://hdfs_host:hdfs_port/data3%2c4", - "column_separator" = "|", - "line_delimiter" = "\n" - ) - BROKER PROPERTIES ( - "username" = "hdfs_user", - "password" = "hdfs_password" - ) - ``` - -3. 创建一个hive外部表 - - ```sql - CREATE TABLE example_db.table_hive - ( - k1 TINYINT, - k2 VARCHAR(50), - v INT - ) - ENGINE=hive - PROPERTIES - ( - "database" = "hive_db_name", - "table" = "hive_table_name", - "hive.metastore.uris" = "thrift://127.0.0.1:9083" - ); - ``` - -4. 创建一个 Iceberg 外表 - - ```sql - CREATE TABLE example_db.t_iceberg - ENGINE=ICEBERG - PROPERTIES ( - "iceberg.database" = "iceberg_db", - "iceberg.table" = "iceberg_table", - "iceberg.hive.metastore.uris" = "thrift://127.0.0.1:9083", - "iceberg.catalog.type" = "HIVE_CATALOG" - ); - ``` - -5. 创建一个 Hudi 外表 - - 创建时不指定schema(推荐) - ```sql - CREATE TABLE example_db.t_hudi - ENGINE=HUDI - PROPERTIES ( - "hudi.database" = "hudi_db_in_hive_metastore", - "hudi.table" = "hudi_table_in_hive_metastore", - "hudi.hive.metastore.uris" = "thrift://127.0.0.1:9083" - ); - ```` - - 创建时指定schema - ```sql - CREATE TABLE example_db.t_hudi ( - `id` int NOT NULL COMMENT "id number", - `name` varchar(10) NOT NULL COMMENT "user name" - ) - ENGINE=HUDI - PROPERTIES ( - "hudi.database" = "hudi_db_in_hive_metastore", - "hudi.table" = "hudi_table_in_hive_metastore", - "hudi.hive.metastore.uris" = "thrift://127.0.0.1:9083" - ); - ```` - -### Keywords - - CREATE, EXTERNAL, TABLE - -### Best Practice - diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index df2cafa4c099ec..0b71947dea4a75 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -924,12 +924,6 @@ public class Config extends ConfigBase { @ConfField public static long es_state_sync_interval_second = 10; - /** - * fe will create iceberg table every iceberg_table_creation_interval_second - */ - @ConfField(mutable = true, masterOnly = true) - public static long iceberg_table_creation_interval_second = 10; - /** * the factor of delay time before deciding to repair tablet. * if priority is VERY_HIGH, repair it immediately. @@ -1505,14 +1499,6 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static long min_bytes_indicate_replica_too_large = 2 * 1024 * 1024 * 1024L; - /** - * If set to TRUE, the column definitions of iceberg table and the doris table must be consistent - * If set to FALSE, Doris only creates columns of supported data types. - * Default is true. - */ - @ConfField(mutable = true, masterOnly = true) - public static boolean iceberg_table_creation_strict_mode = true; - // statistics /* * the max unfinished statistics job number @@ -1761,12 +1747,9 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static long max_backend_heartbeat_failure_tolerance_count = 1; - /** - * The iceberg and hudi table will be removed in v1.3 - * Use multi catalog instead. - */ - @ConfField(mutable = true, masterOnly = false) - public static boolean disable_iceberg_hudi_table = true; + @ConfField(mutable = true, masterOnly = false, description = { + "禁止创建odbc, mysql, broker类型的外表", "Disallow the creation of odbc, mysql, broker type external tables"}) + public static boolean enable_odbc_mysql_broker_table = false; /** * The default connection timeout for hive metastore. @@ -1872,15 +1855,6 @@ public class Config extends ConfigBase { @ConfField(mutable = false, varType = VariableAnnotation.EXPERIMENTAL) public static boolean enable_fqdn_mode = false; - /** - * enable use odbc table - */ - @ConfField(mutable = true, masterOnly = true, description = { - "是否开启 ODBC 外表功能,默认关闭,ODBC 外表是淘汰的功能,请使用 JDBC Catalog", - "Whether to enable the ODBC appearance function, it is disabled by default," - + " and the ODBC appearance is an obsolete feature. Please use the JDBC Catalog"}) - public static boolean enable_odbc_table = false; - /** * This is used whether to push down function to MYSQL in external Table with query sql * like odbc, jdbc for mysql table diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index d6eef426ef7bb0..1cdedf23952df7 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -1811,11 +1811,6 @@ create_stmt ::= RESULT = new CreateTableStmt(ifNotExists, isExternal, name, columns, indexes, engineName, keys, partition, distribution, tblProperties, extProperties, tableComment, index); :} - | KW_CREATE opt_external:isExternal KW_TABLE opt_if_not_exists:ifNotExists table_name:name KW_ENGINE EQUAL ident:engineName - properties:properties opt_comment:tableComment - {: - RESULT = new CreateTableStmt(ifNotExists, isExternal, name, engineName, properties, tableComment); - :} | KW_CREATE opt_external:isExternal KW_TABLE opt_if_not_exists:ifNotExists table_name:name opt_col_list:columns opt_engine:engineName diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateResourceStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateResourceStmt.java index 16b7fb99733ebd..76fb41e88d540f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateResourceStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateResourceStmt.java @@ -94,9 +94,9 @@ public void analyze(Analyzer analyzer) throws UserException { if (resourceType == ResourceType.SPARK && !isExternal) { throw new AnalysisException("Spark is external resource"); } - if (resourceType == ResourceType.ODBC_CATALOG && !Config.enable_odbc_table) { + if (resourceType == ResourceType.ODBC_CATALOG && !Config.enable_odbc_mysql_broker_table) { throw new AnalysisException("ODBC table is deprecated, use JDBC instead. Or you can set " - + "`enable_odbc_table=true` in fe.conf to enable ODBC again."); + + "`enable_odbc_table=true` in fe.conf to enable ODBC again."); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java index 602793b7eb764c..a5c3e893405dbc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java @@ -91,14 +91,11 @@ public class CreateTableStmt extends DdlStmt { static { engineNames = Sets.newHashSet(); engineNames.add("olap"); + engineNames.add("jdbc"); + engineNames.add("elasticsearch"); engineNames.add("odbc"); engineNames.add("mysql"); engineNames.add("broker"); - engineNames.add("elasticsearch"); - engineNames.add("hive"); - engineNames.add("iceberg"); - engineNames.add("hudi"); - engineNames.add("jdbc"); } // if auto bucket auto bucket enable, rewrite distribution bucket num && @@ -200,22 +197,6 @@ public CreateTableStmt(boolean ifNotExists, this.rollupAlterClauseList = (rollupAlterClauseList == null) ? Lists.newArrayList() : rollupAlterClauseList; } - // This is for iceberg/hudi table, which has no column schema - public CreateTableStmt(boolean ifNotExists, - boolean isExternal, - TableName tableName, - String engineName, - Map properties, - String comment) { - this.ifNotExists = ifNotExists; - this.isExternal = isExternal; - this.tableName = tableName; - this.engineName = engineName; - this.properties = properties; - this.columnDefs = Lists.newArrayList(); - this.comment = Strings.nullToEmpty(comment); - } - // for Nereids public CreateTableStmt(boolean ifNotExists, boolean isExternal, @@ -481,7 +462,7 @@ public void analyze(Analyzer analyzer) throws UserException, AnalysisException { } // analyze column def - if (!(engineName.equals("iceberg") || engineName.equals("hudi") || engineName.equals("elasticsearch")) + if (!(engineName.equals("elasticsearch")) && (columnDefs == null || columnDefs.isEmpty())) { ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLE_MUST_HAVE_COLUMNS); } @@ -662,11 +643,7 @@ private void analyzeEngineName() throws AnalysisException { if (engineName.equals("mysql") || engineName.equals("odbc") || engineName.equals("broker") || engineName.equals("elasticsearch") || engineName.equals("hive") - || engineName.equals("iceberg") || engineName.equals("hudi") || engineName.equals("jdbc")) { - if (engineName.equals("odbc") && !Config.enable_odbc_table) { - throw new AnalysisException("ODBC table is deprecated, use JDBC instead. Or you can set " - + "`enable_odbc_table=true` in fe.conf to enable ODBC again."); - } + || engineName.equals("jdbc")) { if (!isExternal) { // this is for compatibility isExternal = true; @@ -679,10 +656,13 @@ private void analyzeEngineName() throws AnalysisException { } } - if (Config.disable_iceberg_hudi_table && (engineName.equals("iceberg") || engineName.equals("hudi"))) { + if (!Config.enable_odbc_mysql_broker_table && (engineName.equals("odbc") + || engineName.equals("mysql") || engineName.equals("broker"))) { throw new AnalysisException( - "iceberg and hudi table is no longer supported. Use multi catalog feature instead." - + ". Or you can temporarily set 'disable_iceberg_hudi_table=false'" + "odbc, mysql and broker table is no longer supported." + + " For odbc and mysql external table, use jdbc table or jdbc catalog instead." + + " For broker table, use table valued function instead." + + ". Or you can temporarily set 'disable_odbc_mysql_broker_table=false'" + " in fe.conf to reopen this feature."); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseProperty.java index ef664fd491e313..3aeaeb43862c62 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DatabaseProperty.java @@ -17,23 +17,16 @@ package org.apache.doris.catalog; -import org.apache.doris.common.Config; -import org.apache.doris.common.DdlException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; -import org.apache.doris.external.iceberg.IcebergCatalog; -import org.apache.doris.external.iceberg.IcebergCatalogMgr; import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.HashMap; import java.util.Map; /** @@ -44,16 +37,10 @@ * such as `checkAndBuildIcebergProperty` to check and build it. */ public class DatabaseProperty implements Writable { - private static final Logger LOG = LogManager.getLogger(DatabaseProperty.class); - - public static final String ICEBERG_PROPERTY_PREFIX = "iceberg"; @SerializedName(value = "properties") private Map properties = Maps.newHashMap(); - // the following variables are built from "properties" - private IcebergProperty icebergProperty = new IcebergProperty(Maps.newHashMap()); - public DatabaseProperty() { } @@ -78,30 +65,6 @@ public Map getProperties() { return properties; } - public IcebergProperty getIcebergProperty() { - return icebergProperty; - } - - public DatabaseProperty checkAndBuildProperties() throws DdlException { - Map icebergProperties = new HashMap<>(); - for (Map.Entry entry : this.properties.entrySet()) { - if (entry.getKey().startsWith(ICEBERG_PROPERTY_PREFIX)) { - if (Config.disable_iceberg_hudi_table) { - throw new DdlException( - "database for iceberg is no longer supported. Use multi catalog feature instead." - + ". Or you can temporarily set 'disable_iceberg_hudi_table=false'" - + " in fe.conf to reopen this feature."); - } else { - icebergProperties.put(entry.getKey(), entry.getValue()); - } - } - } - if (icebergProperties.size() > 0) { - checkAndBuildIcebergProperty(icebergProperties); - } - return this; - } - public BinlogConfig getBinlogConfig() { BinlogConfig binlogConfig = new BinlogConfig(); binlogConfig.mergeFromProperties(properties); @@ -112,17 +75,6 @@ public void updateProperties(Map newProperties) { properties.putAll(newProperties); } - private void checkAndBuildIcebergProperty(Map properties) throws DdlException { - IcebergCatalogMgr.validateProperties(properties, false); - icebergProperty = new IcebergProperty(properties); - String icebergDb = icebergProperty.getDatabase(); - IcebergCatalog icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty); - // check database exists - if (!icebergCatalog.databaseExists(icebergDb)) { - throw new DdlException("Database [" + icebergDb + "] dose not exist in Iceberg."); - } - } - @Override public void write(DataOutput out) throws IOException { Text.writeString(out, GsonUtils.GSON.toJson(this)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index bed741c6e4670c..6ddeb3977a0422 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -139,7 +139,6 @@ import org.apache.doris.deploy.impl.K8sDeployManager; import org.apache.doris.deploy.impl.LocalFileDeployManager; import org.apache.doris.external.elasticsearch.EsRepository; -import org.apache.doris.external.iceberg.IcebergTableCreationRecordMgr; import org.apache.doris.ha.BDBHA; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.ha.HAProtocol; @@ -1570,7 +1569,6 @@ private void startMasterOnlyDaemonThreads() { } streamLoadRecordMgr.start(); tabletLoadIndexRecorderMgr.start(); - getInternalCatalog().getIcebergTableCreationRecordMgr().start(); new InternalSchemaInitializer().start(); if (Config.enable_hms_events_incremental_sync) { metastoreEventsProcessor.start(); @@ -3397,18 +3395,6 @@ public static void getDdlStmt(DdlStmt ddlStmt, String dbName, TableIf table, Lis sb.append("\"table\" = \"").append(hiveTable.getHiveTable()).append("\",\n"); sb.append(new PrintableMap<>(hiveTable.getHiveProperties(), " = ", true, true, hidePassword).toString()); sb.append("\n)"); - } else if (table.getType() == TableType.ICEBERG) { - IcebergTable icebergTable = (IcebergTable) table; - - addTableComment(icebergTable, sb); - - // properties - sb.append("\nPROPERTIES (\n"); - sb.append("\"iceberg.database\" = \"").append(icebergTable.getIcebergDb()).append("\",\n"); - sb.append("\"iceberg.table\" = \"").append(icebergTable.getIcebergTbl()).append("\",\n"); - sb.append(new PrintableMap<>(icebergTable.getIcebergProperties(), - " = ", true, true, hidePassword).toString()); - sb.append("\n)"); } else if (table.getType() == TableType.JDBC) { JdbcTable jdbcTable = (JdbcTable) table; addTableComment(jdbcTable, sb); @@ -3795,10 +3781,6 @@ public TabletLoadIndexRecorderMgr getTabletLoadIndexRecorderMgr() { return tabletLoadIndexRecorderMgr; } - public IcebergTableCreationRecordMgr getIcebergTableCreationRecordMgr() { - return getInternalCatalog().getIcebergTableCreationRecordMgr(); - } - public MasterTaskExecutor getPendingLoadTaskScheduler() { return pendingLoadTaskScheduler; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index 8aa7d3909004a9..96419cd0b0ad27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -29,30 +29,20 @@ import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.SlotRef; -import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; -import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.property.constants.HMSProperties; -import org.apache.doris.fs.FileSystemFactory; -import org.apache.doris.fs.RemoteFiles; -import org.apache.doris.fs.remote.RemoteFile; -import org.apache.doris.fs.remote.RemoteFileSystem; -import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TExprOpcode; import com.aliyun.datalake.metastore.common.DataLakeConfig; import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient; import com.google.common.base.Strings; import com.google.common.collect.Maps; -import com.google.common.collect.Queues; import org.apache.avro.Schema; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -60,11 +50,9 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; -import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; @@ -94,7 +82,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -175,123 +162,6 @@ public static IMetaStoreClient getClient(String metaStoreUris) throws DdlExcepti return metaStoreClient; } - /** - * Get data files of partitions in hive table, filter by partition predicate. - * - * @param hiveTable - * @param hivePartitionPredicate - * @param fileStatuses - * @param remoteHiveTbl - * @return - * @throws DdlException - */ - public static String getHiveDataFiles(HiveTable hiveTable, ExprNodeGenericFuncDesc hivePartitionPredicate, - List fileStatuses, Table remoteHiveTbl, StorageBackend.StorageType type) - throws DdlException { - RemoteFileSystem fs = FileSystemFactory.get("HiveMetaStore", type, hiveTable.getHiveProperties()); - List remoteLocationsList = new ArrayList<>(); - try { - if (remoteHiveTbl.getPartitionKeys().size() > 0) { - String metaStoreUris = hiveTable.getHiveProperties().get(HMSProperties.HIVE_METASTORE_URIS); - // hive partitioned table, get file iterator from table partition sd info - List hivePartitions = getHivePartitions(metaStoreUris, remoteHiveTbl, - hivePartitionPredicate); - for (Partition p : hivePartitions) { - String location = normalizeS3LikeSchema(p.getSd().getLocation()); - remoteLocationsList.add(fs.listLocatedFiles(location)); - } - } else { - // hive non-partitioned table, get file iterator from table sd info - String location = normalizeS3LikeSchema(remoteHiveTbl.getSd().getLocation()); - remoteLocationsList.add(fs.listLocatedFiles(location)); - } - return getAllFileStatus(fileStatuses, remoteLocationsList, fs); - } catch (UserException e) { - throw new DdlException(e.getMessage(), e); - } - } - - public static String normalizeS3LikeSchema(String location) { - String[] objectStorages = Config.s3_compatible_object_storages.split(","); - for (String objectStorage : objectStorages) { - if (location.startsWith(objectStorage + "://")) { - location = location.replaceFirst(objectStorage, "s3"); - break; - } - } - return location; - } - - private static String getAllFileStatus(List fileStatuses, - List remoteLocationsList, RemoteFileSystem fs) - throws UserException { - String hdfsUrl = ""; - Queue queue = Queues.newArrayDeque(remoteLocationsList); - while (queue.peek() != null) { - RemoteFiles locs = queue.poll(); - try { - for (RemoteFile fileLocation : locs.files()) { - Path filePath = fileLocation.getPath(); - // hdfs://host:port/path/to/partition/file_name - String fullUri = filePath.toString(); - if (fileLocation.isDirectory()) { - // recursive visit the directory to get the file path. - queue.add(fs.listLocatedFiles(fullUri)); - continue; - } - TBrokerFileStatus brokerFileStatus = new TBrokerFileStatus(); - brokerFileStatus.setIsDir(fileLocation.isDirectory()); - brokerFileStatus.setIsSplitable(true); - brokerFileStatus.setSize(fileLocation.getSize()); - brokerFileStatus.setModificationTime(fileLocation.getModificationTime()); - // filePath.toUri().getPath() = "/path/to/partition/file_name" - // eg: /home/work/dev/hive/apache-hive-2.3.7-bin/data/warehouse - // + /dae.db/customer/state=CA/city=SanJose/000000_0 - // fullUri: Backend need full s3 path (with s3://bucket at the beginning) to read the data on s3. - // path = "s3://bucket/path/to/partition/file_name" - // eg: s3://hive-s3-test/region/region.tbl - String path = fs.needFullPath() ? fullUri : filePath.toUri().getPath(); - brokerFileStatus.setPath(path); - fileStatuses.add(brokerFileStatus); - if (StringUtils.isEmpty(hdfsUrl)) { - // hdfs://host:port - hdfsUrl = fullUri.replace(path, ""); - } - } - } catch (UserException e) { - LOG.warn("List HDFS file IOException: {}", e.getMessage()); - throw new DdlException("List HDFS file failed. Error: " + e.getMessage()); - } - } - return hdfsUrl; - } - - /** - * list partitions from hiveMetaStore. - * - * @param metaStoreUris hiveMetaStore uris - * @param remoteHiveTbl Hive table - * @param hivePartitionPredicate filter when list partitions - * @return a list of hive partitions - * @throws DdlException when connect hiveMetaStore failed. - */ - public static List getHivePartitions(String metaStoreUris, Table remoteHiveTbl, - ExprNodeGenericFuncDesc hivePartitionPredicate) throws DdlException { - List hivePartitions = new ArrayList<>(); - IMetaStoreClient client = getClient(metaStoreUris); - try { - client.listPartitionsByExpr(remoteHiveTbl.getDbName(), remoteHiveTbl.getTableName(), - SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate), - null, (short) -1, hivePartitions); - } catch (TException e) { - LOG.warn("Hive metastore thrift exception: {}", e.getMessage()); - throw new DdlException("Connect hive metastore failed: " + e.getMessage()); - } finally { - client.close(); - } - return hivePartitions; - } - public static Table getTable(HiveTable hiveTable) throws DdlException { IMetaStoreClient client = getClient(hiveTable.getHiveProperties().get(HMSProperties.HIVE_METASTORE_URIS)); Table table; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java deleted file mode 100644 index 8d6907ff6a2757..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java +++ /dev/null @@ -1,266 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.catalog; - -import org.apache.doris.analysis.StorageBackend; -import org.apache.doris.common.UserException; -import org.apache.doris.common.io.Text; -import org.apache.doris.common.util.Util; -import org.apache.doris.external.iceberg.IcebergCatalog; -import org.apache.doris.external.iceberg.IcebergCatalogMgr; -import org.apache.doris.thrift.TBrokerFileStatus; -import org.apache.doris.thrift.TIcebergTable; -import org.apache.doris.thrift.TTableDescriptor; -import org.apache.doris.thrift.TTableType; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.commons.lang3.StringUtils; -import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.Schema; -import org.apache.iceberg.TableProperties; -import org.apache.iceberg.TableScan; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.expressions.Expression; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * External Iceberg table - */ -public class IcebergTable extends Table { - private static final Logger LOG = LogManager.getLogger(IcebergTable.class); - - // remote Iceberg database name - private String icebergDb; - // remote Iceberg table name - private String icebergTbl; - // remote Iceberg table location - private String location; - // Iceberg table file format - private String fileFormat; - // Iceberg storage type - private StorageBackend.StorageType storageType; - // Iceberg remote host uri - private String hostUri; - // location analyze flag - private boolean isAnalyzed = false; - private Map icebergProperties = Maps.newHashMap(); - - private org.apache.iceberg.Table icebergTable; - - private final byte[] loadLock = new byte[0]; - private final AtomicBoolean isLoaded = new AtomicBoolean(false); - - public IcebergTable() { - super(TableType.ICEBERG); - } - - public IcebergTable(long id, String tableName, List fullSchema, IcebergProperty icebergProperty, - org.apache.iceberg.Table icebergTable) { - super(id, tableName, TableType.ICEBERG, fullSchema); - this.icebergDb = icebergProperty.getDatabase(); - this.icebergTbl = icebergProperty.getTable(); - - icebergProperties.put(IcebergProperty.ICEBERG_HIVE_METASTORE_URIS, icebergProperty.getHiveMetastoreUris()); - icebergProperties.put(IcebergProperty.ICEBERG_CATALOG_TYPE, icebergProperty.getCatalogType()); - icebergProperties.putAll(icebergProperty.getDfsProperties()); - this.icebergTable = icebergTable; - } - - public String getIcebergDbTable() { - return String.format("%s.%s", icebergDb, icebergTbl); - } - - public String getIcebergDb() { - return icebergDb; - } - - public String getIcebergTbl() { - return icebergTbl; - } - - public Map getIcebergProperties() { - return icebergProperties; - } - - private void getLocation() throws UserException { - if (Strings.isNullOrEmpty(location)) { - try { - getTable(); - } catch (Exception e) { - throw new UserException("Failed to get table: " + name + ",error: " + e.getMessage()); - } - location = icebergTable.location(); - } - analyzeLocation(); - } - - private void analyzeLocation() throws UserException { - if (isAnalyzed) { - return; - } - String[] strings = StringUtils.split(location, "/"); - - // analyze storage type - String storagePrefix = strings[0].split(":")[0]; - if (Util.isS3CompatibleStorageSchema(storagePrefix)) { - this.storageType = StorageBackend.StorageType.S3; - } else if (storagePrefix.equalsIgnoreCase("hdfs")) { - this.storageType = StorageBackend.StorageType.HDFS; - } else { - throw new UserException("Not supported storage type: " + storagePrefix); - } - - // analyze host uri - // eg: hdfs://host:port - // s3://host:port - String host = strings[1]; - this.hostUri = storagePrefix + "://" + host; - this.isAnalyzed = true; - } - - public String getHostUri() throws UserException { - if (!isAnalyzed) { - getLocation(); - } - return hostUri; - } - - public StorageBackend.StorageType getStorageType() throws UserException { - if (!isAnalyzed) { - getLocation(); - } - return storageType; - } - - public String getFileFormat() throws UserException { - if (Strings.isNullOrEmpty(fileFormat)) { - try { - getTable(); - } catch (Exception e) { - throw new UserException("Failed to get table: " + name + ",error: " + e.getMessage()); - } - fileFormat = icebergTable.properties().get(TableProperties.DEFAULT_FILE_FORMAT); - } - return fileFormat; - } - - public Schema getIcebergSchema() { - return icebergTable.schema(); - } - - private org.apache.iceberg.Table getTable() throws Exception { - if (isLoaded.get()) { - Preconditions.checkNotNull(icebergTable); - return icebergTable; - } - synchronized (loadLock) { - if (icebergTable != null) { - return icebergTable; - } - - IcebergProperty icebergProperty = getIcebergProperty(); - IcebergCatalog icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty); - try { - this.icebergTable = icebergCatalog.loadTable(TableIdentifier.of(icebergDb, icebergTbl)); - LOG.info("finished to load iceberg table: {}", name); - } catch (Exception e) { - LOG.warn("failed to load iceberg table {} from {}", name, icebergProperty.getHiveMetastoreUris(), e); - throw e; - } - - isLoaded.set(true); - return icebergTable; - } - } - - private IcebergProperty getIcebergProperty() { - Map properties = Maps.newHashMap(icebergProperties); - properties.put(IcebergProperty.ICEBERG_DATABASE, icebergDb); - properties.put(IcebergProperty.ICEBERG_TABLE, icebergTbl); - return new IcebergProperty(properties); - } - - /** - * Get iceberg data file by file system table location and iceberg predicates - * @throws Exception - */ - public List getIcebergDataFiles(List predicates) throws Exception { - org.apache.iceberg.Table table = getTable(); - TableScan scan = table.newScan(); - for (Expression predicate : predicates) { - scan = scan.filter(predicate); - } - List relatedFiles = Lists.newArrayList(); - for (FileScanTask task : scan.planFiles()) { - Path path = Paths.get(task.file().path().toString()); - String relativePath = "/" + path.subpath(2, path.getNameCount()); - relatedFiles.add(new TBrokerFileStatus(relativePath, false, task.file().fileSizeInBytes(), false)); - } - return relatedFiles; - } - - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - - Text.writeString(out, icebergDb); - Text.writeString(out, icebergTbl); - - out.writeInt(icebergProperties.size()); - for (Map.Entry entry : icebergProperties.entrySet()) { - Text.writeString(out, entry.getKey()); - Text.writeString(out, entry.getValue()); - } - } - - @Override - public void readFields(DataInput in) throws IOException { - super.readFields(in); - - icebergDb = Text.readString(in); - icebergTbl = Text.readString(in); - int size = in.readInt(); - for (int i = 0; i < size; i++) { - String key = Text.readString(in); - String value = Text.readString(in); - icebergProperties.put(key, value); - } - } - - @Override - public TTableDescriptor toThrift() { - TIcebergTable tIcebergTable = new TIcebergTable(getIcebergDb(), getIcebergTbl(), getIcebergProperties()); - TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ICEBERG_TABLE, - fullSchema.size(), 0, getName(), ""); - tTableDescriptor.setIcebergTable(tIcebergTable); - return tTableDescriptor; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java index c72d3ffb4db823..5caf826bfe89c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java @@ -17,12 +17,9 @@ package org.apache.doris.catalog; -import org.apache.doris.analysis.CreateTableStmt; -import org.apache.doris.analysis.DropTableStmt; import org.apache.doris.analysis.RefreshCatalogStmt; import org.apache.doris.analysis.RefreshDbStmt; import org.apache.doris.analysis.RefreshTableStmt; -import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.external.ExternalDatabase; import org.apache.doris.common.DdlException; import org.apache.doris.common.ThreadPoolManager; @@ -30,7 +27,6 @@ import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalObjectLog; -import org.apache.doris.datasource.InternalCatalog; import org.apache.doris.qe.DdlExecutor; import com.google.common.collect.Maps; @@ -64,13 +60,8 @@ public void handleRefreshTable(RefreshTableStmt stmt) throws UserException { throw new DdlException("Catalog " + catalogName + " doesn't exist."); } - if (catalog.getName().equals(InternalCatalog.INTERNAL_CATALOG_NAME)) { - // Process internal catalog iceberg external table refresh. - refreshInternalCtlIcebergTable(stmt, env); - } else { - // Process external catalog table refresh - env.getCatalogMgr().refreshExternalTable(dbName, tableName, catalogName, false); - } + // Process external catalog table refresh + env.getCatalogMgr().refreshExternalTable(dbName, tableName, catalogName, false); LOG.info("Successfully refresh table: {} from db: {}", tableName, dbName); } @@ -84,42 +75,11 @@ public void handleRefreshDb(RefreshDbStmt stmt) throws DdlException { throw new DdlException("Catalog " + catalogName + " doesn't exist."); } - if (catalog.getName().equals(InternalCatalog.INTERNAL_CATALOG_NAME)) { - // Process internal catalog iceberg external db refresh. - refreshInternalCtlIcebergDb(dbName, env); - } else { - // Process external catalog db refresh - refreshExternalCtlDb(dbName, catalog, stmt.isInvalidCache()); - } + // Process external catalog db refresh + refreshExternalCtlDb(dbName, catalog, stmt.isInvalidCache()); LOG.info("Successfully refresh db: {}", dbName); } - private void refreshInternalCtlIcebergDb(String dbName, Env env) throws DdlException { - Database db = env.getInternalCatalog().getDbOrDdlException(dbName); - - // 0. build iceberg property - // Since we have only persisted database properties with key-value format in DatabaseProperty, - // we build IcebergProperty here, before checking database type. - db.getDbProperties().checkAndBuildProperties(); - // 1. check database type - if (!db.getDbProperties().getIcebergProperty().isExist()) { - throw new DdlException("Only support refresh Iceberg database."); - } - - // 2. only drop iceberg table in the database - // Current database may have other types of table, which is not allowed to drop. - for (Table table : db.getTables()) { - if (table instanceof IcebergTable) { - DropTableStmt dropTableStmt = - new DropTableStmt(true, new TableName(null, dbName, table.getName()), true); - env.dropTable(dropTableStmt); - } - } - - // 3. register iceberg database to recreate iceberg table - env.getIcebergTableCreationRecordMgr().registerDb(db); - } - private void refreshExternalCtlDb(String dbName, CatalogIf catalog, boolean invalidCache) throws DdlException { if (!(catalog instanceof ExternalCatalog)) { throw new DdlException("Only support refresh ExternalCatalog Database"); @@ -137,29 +97,6 @@ private void refreshExternalCtlDb(String dbName, CatalogIf catalog, boolean inva Env.getCurrentEnv().getEditLog().logRefreshExternalDb(log); } - private void refreshInternalCtlIcebergTable(RefreshTableStmt stmt, Env env) throws UserException { - // 0. check table type - Database db = env.getInternalCatalog().getDbOrDdlException(stmt.getDbName()); - Table table = db.getTableNullable(stmt.getTblName()); - if (!(table instanceof IcebergTable)) { - throw new DdlException("Only support refresh Iceberg table."); - } - - // 1. get iceberg properties - Map icebergProperties = ((IcebergTable) table).getIcebergProperties(); - icebergProperties.put(IcebergProperty.ICEBERG_TABLE, ((IcebergTable) table).getIcebergTbl()); - icebergProperties.put(IcebergProperty.ICEBERG_DATABASE, ((IcebergTable) table).getIcebergDb()); - - // 2. drop old table - DropTableStmt dropTableStmt = new DropTableStmt(true, stmt.getTableName(), true); - env.dropTable(dropTableStmt); - - // 3. create new table - CreateTableStmt createTableStmt = new CreateTableStmt(true, true, - stmt.getTableName(), "ICEBERG", icebergProperties, ""); - env.createTable(createTableStmt); - } - public void addToRefreshMap(long catalogId, Integer[] sec) { refreshMap.put(catalogId, sec); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 30b0c9da4b2c2d..b14e1444eccec9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -26,7 +26,6 @@ import org.apache.doris.common.util.QueryableReentrantReadWriteLock; import org.apache.doris.common.util.SqlUtils; import org.apache.doris.common.util.Util; -import org.apache.doris.external.hudi.HudiTable; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ColumnStatistic; @@ -377,10 +376,6 @@ public static Table read(DataInput in) throws IOException { table = new EsTable(); } else if (type == TableType.HIVE) { table = new HiveTable(); - } else if (type == TableType.ICEBERG) { - table = new IcebergTable(); - } else if (type == TableType.HUDI) { - table = new HudiTable(); } else if (type == TableType.JDBC) { table = new JdbcTable(); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 5b94e7529d0e1f..d0ba1f5a944085 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -69,7 +69,6 @@ import org.apache.doris.catalog.EsTable; import org.apache.doris.catalog.HashDistributionInfo; import org.apache.doris.catalog.HiveTable; -import org.apache.doris.catalog.IcebergTable; import org.apache.doris.catalog.Index; import org.apache.doris.catalog.InfoSchemaDb; import org.apache.doris.catalog.JdbcTable; @@ -135,8 +134,6 @@ import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient; import org.apache.doris.datasource.property.constants.HMSProperties; import org.apache.doris.external.elasticsearch.EsRepository; -import org.apache.doris.external.iceberg.IcebergCatalogMgr; -import org.apache.doris.external.iceberg.IcebergTableCreationRecordMgr; import org.apache.doris.persist.AlterDatabasePropertyInfo; import org.apache.doris.persist.AutoIncrementIdUpdateLog; import org.apache.doris.persist.ColocatePersistInfo; @@ -212,8 +209,6 @@ public class InternalCatalog implements CatalogIf { // Add transient to fix gson issue. @Getter private transient EsRepository esRepository = new EsRepository(); - @Getter - private IcebergTableCreationRecordMgr icebergTableCreationRecordMgr = new IcebergTableCreationRecordMgr(); public InternalCatalog() { // create internal databases @@ -420,7 +415,7 @@ public void createDb(CreateDbStmt stmt) throws DdlException { Database db = new Database(id, fullDbName); db.setClusterName(SystemInfoService.DEFAULT_CLUSTER); // check and analyze database properties before create database - db.setDbProperties(new DatabaseProperty(properties).checkAndBuildProperties()); + db.setDbProperties(new DatabaseProperty(properties)); if (!tryLock(false)) { throw new DdlException("Failed to acquire catalog lock. Try again"); @@ -441,11 +436,6 @@ public void createDb(CreateDbStmt stmt) throws DdlException { unlock(); } LOG.info("createDb dbName = " + fullDbName + ", id = " + id); - - // create tables in iceberg database - if (db.getDbProperties().getIcebergProperty().isExist()) { - icebergTableCreationRecordMgr.registerDb(db); - } } /** @@ -557,10 +547,6 @@ public void dropDb(DropDbStmt stmt) throws DdlException { } public void unprotectDropDb(Database db, boolean isForeDrop, boolean isReplay, long recycleTime) { - // drop Iceberg database table creation records - if (db.getDbProperties().getIcebergProperty().isExist()) { - icebergTableCreationRecordMgr.deregisterDb(db); - } for (Table table : db.getTables()) { unprotectDropTable(db, table, isForeDrop, isReplay, recycleTime); } @@ -943,9 +929,6 @@ public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, // drop all temp partitions of this table, so that there is no temp partitions in recycle bin, // which make things easier. ((OlapTable) table).dropAllTempPartitions(); - } else if (table.getType() == TableType.ICEBERG) { - // drop Iceberg database table creation record - icebergTableCreationRecordMgr.deregisterTable(db, (IcebergTable) table); } else if (table.getType() == TableType.MATERIALIZED_VIEW) { Env.getCurrentEnv().getMtmvService().deregisterMTMV((MTMV) table); } @@ -1123,9 +1106,6 @@ public void createTable(CreateTableStmt stmt) throws UserException { } else if (engineName.equalsIgnoreCase("hive")) { createHiveTable(db, stmt); return; - } else if (engineName.equalsIgnoreCase("iceberg")) { - IcebergCatalogMgr.createIcebergTable(db, stmt); - return; } else if (engineName.equalsIgnoreCase("jdbc")) { createJdbcTable(db, stmt); return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/ExternalScanRange.java b/fe/fe-core/src/main/java/org/apache/doris/external/ExternalScanRange.java deleted file mode 100644 index 52cfaa81c237d8..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/external/ExternalScanRange.java +++ /dev/null @@ -1,26 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.external; - -/** - * Used to describe the data information that ExternalScanNode needs to read external catalogs. - * For example, for hive, the ExternalScanRange may save the file info which need to be read, - * such as file path, file format, start and offset, etc. - */ -public class ExternalScanRange { -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java b/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java deleted file mode 100644 index 80ab48d58b59da..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/external/hudi/HudiTable.java +++ /dev/null @@ -1,110 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.external.hudi; - -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Table; -import org.apache.doris.common.io.Text; -import org.apache.doris.thrift.THudiTable; -import org.apache.doris.thrift.TTableDescriptor; -import org.apache.doris.thrift.TTableType; - -import com.google.common.collect.Maps; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -@Deprecated -public class HudiTable extends Table { - private static final Logger LOG = LogManager.getLogger(HudiTable.class); - - public static final String HUDI_DATABASE = "hudi.database"; - public static final String HUDI_TABLE = "hudi.table"; - - // table properties of this hudi table - private Map tableProperties = Maps.newHashMap(); - // remote Hudi database name in hive metastore - private String hmsDatabaseName; - // remote Hudi table name in hive metastore - private String hmsTableName; - - public HudiTable() { - super(TableType.HUDI); - } - - /** - * Generate a Hudi Table with id, name, schema, properties. - * - * @param id table id - * @param tableName table name - * @param fullSchema table's schema - * @param tableProperties table's properties - */ - public HudiTable(long id, String tableName, List fullSchema, Map tableProperties) { - super(id, tableName, TableType.HUDI, fullSchema); - this.tableProperties = tableProperties; - this.hmsDatabaseName = tableProperties.get(HUDI_DATABASE); - this.hmsTableName = tableProperties.get(HUDI_TABLE); - } - - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - - Text.writeString(out, hmsDatabaseName); - Text.writeString(out, hmsTableName); - - out.writeInt(tableProperties.size()); - for (Map.Entry entry : tableProperties.entrySet()) { - Text.writeString(out, entry.getKey()); - Text.writeString(out, entry.getValue()); - } - } - - @Override - public void readFields(DataInput in) throws IOException { - super.readFields(in); - - hmsDatabaseName = Text.readString(in); - hmsTableName = Text.readString(in); - int size = in.readInt(); - for (int i = 0; i < size; i++) { - String key = Text.readString(in); - String value = Text.readString(in); - tableProperties.put(key, value); - } - } - - @Override - public TTableDescriptor toThrift() { - THudiTable thriftHudiTable = new THudiTable(); - thriftHudiTable.setDbName(hmsDatabaseName); - thriftHudiTable.setTableName(hmsTableName); - thriftHudiTable.setProperties(tableProperties); - - TTableDescriptor thriftTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE, - fullSchema.size(), 0, getName(), ""); - thriftTableDescriptor.setHudiTable(thriftHudiTable); - return thriftTableDescriptor; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/DorisIcebergException.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/DorisIcebergException.java deleted file mode 100644 index 6c5748091a7e8d..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/DorisIcebergException.java +++ /dev/null @@ -1,32 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.external.iceberg; - -/** - * Exception class for Iceberg in Doris - */ -public class DorisIcebergException extends RuntimeException { - - public DorisIcebergException(String message) { - super(message); - } - - public DorisIcebergException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/HiveCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/HiveCatalog.java deleted file mode 100644 index 90eb013b685e2c..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/HiveCatalog.java +++ /dev/null @@ -1,91 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.external.iceberg; - -import org.apache.doris.catalog.IcebergProperty; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * HiveCatalog of Iceberg - */ -public class HiveCatalog implements IcebergCatalog { - private static final Logger LOG = LogManager.getLogger(HiveCatalog.class); - - private org.apache.iceberg.hive.HiveCatalog hiveCatalog; - - public HiveCatalog() { - hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); - } - - @Override - public void initialize(IcebergProperty icebergProperty) { - // set hadoop conf - Configuration conf = new HdfsConfiguration(); - for (Map.Entry entry : icebergProperty.getDfsProperties().entrySet()) { - conf.set(entry.getKey(), entry.getValue()); - } - hiveCatalog.setConf(conf); - // initialize hive catalog - Map catalogProperties = new HashMap<>(); - catalogProperties.put("uri", icebergProperty.getHiveMetastoreUris()); - hiveCatalog.initialize("hive", catalogProperties); - } - - @Override - public boolean tableExists(TableIdentifier tableIdentifier) { - return hiveCatalog.tableExists(tableIdentifier); - } - - @Override - public Table loadTable(TableIdentifier tableIdentifier) throws DorisIcebergException { - try { - return hiveCatalog.loadTable(tableIdentifier); - } catch (Exception e) { - LOG.warn("Failed to load table[{}] from database[{}], with error: {}", - tableIdentifier.name(), tableIdentifier.namespace(), e.getMessage()); - throw new DorisIcebergException(String.format("Failed to load table[%s] from database[%s]", - tableIdentifier.name(), tableIdentifier.namespace()), e); - } - } - - @Override - public List listTables(String db) throws DorisIcebergException { - try { - return hiveCatalog.listTables(Namespace.of(db)); - } catch (Exception e) { - LOG.warn("Failed to list table in database[{}], with error: {}", db, e.getMessage()); - throw new DorisIcebergException(String.format("Failed to list table in database[%s]", db), e); - } - } - - @Override - public boolean databaseExists(String db) { - return hiveCatalog.namespaceExists(Namespace.of(db)); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalog.java deleted file mode 100644 index ccb74026777a79..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalog.java +++ /dev/null @@ -1,67 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.external.iceberg; - - -import org.apache.doris.catalog.IcebergProperty; - -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.TableIdentifier; - -import java.util.List; - -/** - * A Catalog API for iceberg table and namespace. - */ -public interface IcebergCatalog { - /** - * Initialize a catalog given a map of catalog properties. - * @param icebergProperty - */ - default void initialize(IcebergProperty icebergProperty) { - } - - /** - * Check whether table exists. - * @param tableIdentifier - */ - default boolean tableExists(TableIdentifier tableIdentifier) { - return false; - } - - /** - * Load a table - * @param tableIdentifier - */ - Table loadTable(TableIdentifier tableIdentifier) throws DorisIcebergException; - - /** - * Return all the identifiers under this db. - * @param db - */ - List listTables(String db) throws DorisIcebergException; - - /** - * Checks whether the database exists. - * - * @param db - */ - default boolean databaseExists(String db) { - return false; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java deleted file mode 100644 index c219c0fade1f1a..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergCatalogMgr.java +++ /dev/null @@ -1,220 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.external.iceberg; - -import org.apache.doris.analysis.CreateTableStmt; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.IcebergProperty; -import org.apache.doris.catalog.IcebergTable; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.SystemIdGenerator; -import org.apache.doris.external.iceberg.util.IcebergUtils; - -import com.google.common.base.Enums; -import com.google.common.base.Strings; -import com.google.common.collect.Maps; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Iceberg catalog manager - */ -public class IcebergCatalogMgr { - private static final Logger LOG = LogManager.getLogger(IcebergCatalogMgr.class); - - private static final String PROPERTY_MISSING_MSG = "Iceberg %s is null. " - + "Please add properties('%s'='xxx') when create iceberg database."; - - // hive metastore uri -> iceberg catalog - // used to cache iceberg catalogs - private static final ConcurrentHashMap metastoreUriToCatalog = new ConcurrentHashMap(); - - // TODO:(qjl) We'll support more types of Iceberg catalog. - public enum CatalogType { - HIVE_CATALOG - } - - public static IcebergCatalog getCatalog(IcebergProperty icebergProperty) throws DdlException { - String uri = icebergProperty.getHiveMetastoreUris(); - if (!metastoreUriToCatalog.containsKey(uri)) { - metastoreUriToCatalog.put(uri, createCatalog(icebergProperty)); - } - return metastoreUriToCatalog.get(uri); - } - - private static IcebergCatalog createCatalog(IcebergProperty icebergProperty) throws DdlException { - CatalogType type = CatalogType.valueOf(icebergProperty.getCatalogType()); - IcebergCatalog catalog; - switch (type) { - case HIVE_CATALOG: - catalog = new HiveCatalog(); - break; - default: - throw new DdlException("Unsupported catalog type: " + type); - } - catalog.initialize(icebergProperty); - return catalog; - } - - public static void validateProperties(Map properties, boolean isTable) throws DdlException { - if (properties.size() == 0) { - throw new DdlException("Please set properties of iceberg, " - + "they are: iceberg.database and 'iceberg.hive.metastore.uris'"); - } - - Map copiedProps = Maps.newHashMap(properties); - String icebergDb = copiedProps.get(IcebergProperty.ICEBERG_DATABASE); - if (Strings.isNullOrEmpty(icebergDb)) { - throw new DdlException(String.format(PROPERTY_MISSING_MSG, - IcebergProperty.ICEBERG_DATABASE, IcebergProperty.ICEBERG_DATABASE)); - } - copiedProps.remove(IcebergProperty.ICEBERG_DATABASE); - - // check hive properties - // hive.metastore.uris - String hiveMetastoreUris = copiedProps.get(IcebergProperty.ICEBERG_HIVE_METASTORE_URIS); - if (Strings.isNullOrEmpty(hiveMetastoreUris)) { - throw new DdlException(String.format(PROPERTY_MISSING_MSG, - IcebergProperty.ICEBERG_HIVE_METASTORE_URIS, IcebergProperty.ICEBERG_HIVE_METASTORE_URIS)); - } - copiedProps.remove(IcebergProperty.ICEBERG_HIVE_METASTORE_URIS); - - // check iceberg catalog type - String icebergCatalogType = copiedProps.get(IcebergProperty.ICEBERG_CATALOG_TYPE); - if (Strings.isNullOrEmpty(icebergCatalogType)) { - icebergCatalogType = IcebergCatalogMgr.CatalogType.HIVE_CATALOG.name(); - properties.put(IcebergProperty.ICEBERG_CATALOG_TYPE, icebergCatalogType); - } else { - copiedProps.remove(IcebergProperty.ICEBERG_CATALOG_TYPE); - } - - if (!Enums.getIfPresent(IcebergCatalogMgr.CatalogType.class, icebergCatalogType).isPresent()) { - throw new DdlException("Unknown catalog type: " + icebergCatalogType - + ". Current only support HiveCatalog."); - } - - // only check table property when it's an iceberg table - if (isTable) { - String icebergTbl = copiedProps.get(IcebergProperty.ICEBERG_TABLE); - if (Strings.isNullOrEmpty(icebergTbl)) { - throw new DdlException(String.format(PROPERTY_MISSING_MSG, - IcebergProperty.ICEBERG_TABLE, IcebergProperty.ICEBERG_TABLE)); - } - copiedProps.remove(IcebergProperty.ICEBERG_TABLE); - } - - if (!copiedProps.isEmpty()) { - Iterator> iter = copiedProps.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry entry = iter.next(); - if (entry.getKey().startsWith(IcebergProperty.ICEBERG_HDFS_PREFIX)) { - iter.remove(); - } - } - } - - if (!copiedProps.isEmpty()) { - throw new DdlException("Unknown table properties: " + copiedProps.toString()); - } - } - - /** - * Get Doris IcebergTable from remote Iceberg by database and table - * @param tableId table id in Doris - * @param tableName table name in Doris - * @param icebergProperty Iceberg property - * @param identifier Iceberg table identifier - * @param isTable - * @return IcebergTable in Doris - * @throws DdlException - */ - public static IcebergTable getTableFromIceberg(long tableId, String tableName, IcebergProperty icebergProperty, - TableIdentifier identifier, - boolean isTable) throws DdlException { - IcebergCatalog icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty); - - if (isTable && !icebergCatalog.tableExists(identifier)) { - throw new DdlException(String.format("Table [%s] dose not exist in Iceberg.", identifier.toString())); - } - - // get iceberg table schema - org.apache.iceberg.Table icebergTable = icebergCatalog.loadTable(identifier); - - // covert iceberg table schema to Doris's - List columns = IcebergUtils.createSchemaFromIcebergSchema(icebergTable.schema()); - - // create new iceberg table in doris - IcebergTable table = new IcebergTable(tableId, tableName, columns, icebergProperty, icebergTable); - - return table; - - } - - /** - * create iceberg table in Doris - * - * 1. check table existence in Iceberg - * 2. get table schema from Iceberg - * 3. convert Iceberg table schema to Doris table schema - * 4. create associate table in Doris - * - * @param db - * @param stmt - * @throws DdlException - */ - public static void createIcebergTable(Database db, CreateTableStmt stmt) throws DdlException { - String tableName = stmt.getTableName(); - Map properties = stmt.getProperties(); - - // validate iceberg table properties - validateProperties(properties, true); - IcebergProperty icebergProperty = new IcebergProperty(properties); - - String icebergDb = icebergProperty.getDatabase(); - String icebergTbl = icebergProperty.getTable(); - - // create iceberg table struct - // 1. Already set column def in Create Stmt, just create table - // 2. No column def in Create Stmt, get it from remote Iceberg schema. - IcebergTable table; - long tableId = SystemIdGenerator.getNextId(); - if (stmt.getColumns().size() > 0) { - // set column def in CREATE TABLE - table = new IcebergTable(tableId, tableName, stmt.getColumns(), icebergProperty, null); - } else { - // get column def from remote Iceberg - table = getTableFromIceberg(tableId, tableName, icebergProperty, - TableIdentifier.of(icebergDb, icebergTbl), true); - } - - // check iceberg table if exists in doris database - if (!db.createTableWithLock(table, false, stmt.isSetIfNotExists()).first) { - ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); - } - LOG.info("successfully create table[{}-{}]", tableName, table.getId()); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecord.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecord.java deleted file mode 100644 index 125818186a38fd..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecord.java +++ /dev/null @@ -1,88 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.external.iceberg; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.ArrayList; -import java.util.List; - -/** - * Represents the record of Iceberg table automating creation in an Iceberg database - */ -public class IcebergTableCreationRecord { - private static final Logger LOG = LogManager.getLogger(IcebergTableCreationRecord.class); - - private long dbId; - private long tableId; - private String db; - private String table; - private String status; - private String createTime; - private String errorMsg; - - public IcebergTableCreationRecord(long dbId, long tableId, String db, String table, String status, - String createTime, String errorMsg) { - this.dbId = dbId; - this.tableId = tableId; - this.db = db; - this.table = table; - this.status = status; - this.createTime = createTime; - this.errorMsg = errorMsg; - } - - public List getTableCreationRecord() { - List record = new ArrayList<>(); - record.add(this.db); - record.add(this.table); - record.add(this.status); - record.add(this.createTime); - record.add(this.errorMsg); - return record; - } - - public long getDbId() { - return dbId; - } - - public long getTableId() { - return tableId; - } - - public String getDb() { - return db; - } - - public String getTable() { - return table; - } - - public String getStatus() { - return status; - } - - public String getCreateTime() { - return createTime; - } - - public String getErrorMsg() { - return errorMsg; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecordMgr.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecordMgr.java deleted file mode 100644 index 8061166050d938..00000000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/IcebergTableCreationRecordMgr.java +++ /dev/null @@ -1,279 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.external.iceberg; - -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.IcebergProperty; -import org.apache.doris.catalog.IcebergTable; -import org.apache.doris.common.Config; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.ErrorCode; -import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.SystemIdGenerator; -import org.apache.doris.common.property.PropertySchema; -import org.apache.doris.common.util.MasterDaemon; -import org.apache.doris.common.util.TimeUtils; - -import com.google.common.collect.Maps; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.ArrayList; -import java.util.Comparator; -import java.util.Date; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.PriorityQueue; -import java.util.Queue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - * Manager for Iceberg automatic creation table records - * used to create iceberg tables and show table creation records - */ -public class IcebergTableCreationRecordMgr extends MasterDaemon { - private static final Logger LOG = LogManager.getLogger(IcebergTableCreationRecordMgr.class); - - private static final String SUCCESS = "success"; - private static final String FAIL = "fail"; - - // Iceberg databases, used to list remote iceberg tables - // dbId -> database - private final Map icebergDbs = new ConcurrentHashMap<>(); - // database -> table identifier -> properties - // used to create table - private final Map> dbToTableIdentifiers = Maps.newConcurrentMap(); - // table creation records, used for show stmt - // dbId -> tableId -> create msg - private final Map> dbToTableToCreationRecord = Maps.newConcurrentMap(); - - private final Queue tableCreationRecordQueue - = new PriorityQueue<>(new TableCreationComparator()); - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - - - public IcebergTableCreationRecordMgr() { - super("iceberg_table_creation_record_mgr", Config.iceberg_table_creation_interval_second * 1000); - } - - public void registerDb(Database db) throws DdlException { - long dbId = db.getId(); - icebergDbs.put(dbId, db); - LOG.info("Register a new Iceberg database[{}-{}]", dbId, db.getFullName()); - } - - private void registerTable(Database db, TableIdentifier identifier, IcebergProperty icebergProperty) { - if (dbToTableIdentifiers.containsKey(db)) { - dbToTableIdentifiers.get(db).put(identifier, icebergProperty); - } else { - Map identifierToProperties = Maps.newConcurrentMap(); - identifierToProperties.put(identifier, icebergProperty); - dbToTableIdentifiers.put(db, identifierToProperties); - } - LOG.info("Register a new table[{}] to database[{}]", identifier.name(), db.getFullName()); - } - - public void deregisterDb(Database db) { - icebergDbs.remove(db.getId()); - dbToTableIdentifiers.remove(db); - dbToTableToCreationRecord.remove(db.getId()); - LOG.info("Deregister database[{}-{}]", db.getFullName(), db.getId()); - } - - public void deregisterTable(Database db, IcebergTable table) { - if (dbToTableIdentifiers.containsKey(db)) { - TableIdentifier identifier = TableIdentifier.of(table.getIcebergDb(), table.getIcebergTbl()); - Map identifierToProperties = dbToTableIdentifiers.get(db); - identifierToProperties.remove(identifier); - } - if (dbToTableToCreationRecord.containsKey(db.getId())) { - Map recordMap = dbToTableToCreationRecord.get(db.getId()); - recordMap.remove(table.getId()); - } - LOG.info("Deregister table[{}-{}] from database[{}-{}]", table.getName(), - table.getId(), db.getFullName(), db.getId()); - } - - // remove already created tables or failed tables - private void removeDuplicateTables() { - for (Map.Entry> entry : dbToTableToCreationRecord.entrySet()) { - Env.getCurrentInternalCatalog().getDb(entry.getKey()).ifPresent(db -> { - if (dbToTableIdentifiers.containsKey(db)) { - for (Map.Entry innerEntry : entry.getValue().entrySet()) { - String tableName = innerEntry.getValue().getTable(); - String icebergDbName = db.getDbProperties().getIcebergProperty().getDatabase(); - TableIdentifier identifier = TableIdentifier.of(icebergDbName, tableName); - dbToTableIdentifiers.get(db).remove(identifier); - } - } - }); - } - } - - @Override - protected void runAfterCatalogReady() { - PropertySchema.DateProperty prop = - new PropertySchema.DateProperty("key", TimeUtils.DATETIME_FORMAT); - // list iceberg tables in dbs - // When listing table is done, remove database from icebergDbs. - for (Iterator> it = icebergDbs.entrySet().iterator(); it.hasNext(); it.remove()) { - Map.Entry entry = it.next(); - Database db = entry.getValue(); - IcebergProperty icebergProperty = db.getDbProperties().getIcebergProperty(); - IcebergCatalog icebergCatalog = null; - try { - icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty); - } catch (DdlException e) { - addTableCreationRecord(db.getId(), -1, db.getFullName(), "", FAIL, - prop.writeTimeFormat(new Date(System.currentTimeMillis())), e.getMessage()); - LOG.warn("Failed get Iceberg catalog, hive.metastore.uris[{}], error: {}", - icebergProperty.getHiveMetastoreUris(), e.getMessage()); - } - List icebergTables = null; - try { - icebergTables = icebergCatalog.listTables(icebergProperty.getDatabase()); - - } catch (Exception e) { - addTableCreationRecord(db.getId(), -1, db.getFullName(), "", FAIL, - prop.writeTimeFormat(new Date(System.currentTimeMillis())), e.getMessage()); - LOG.warn("Failed list remote Iceberg database, hive.metastore.uris[{}], database[{}], error: {}", - icebergProperty.getHiveMetastoreUris(), icebergProperty.getDatabase(), e.getMessage()); - } - for (TableIdentifier identifier : icebergTables) { - IcebergProperty tableProperties = new IcebergProperty(icebergProperty); - tableProperties.setTable(identifier.name()); - registerTable(db, identifier, tableProperties); - } - } - - // create table in Doris - for (Map.Entry> entry : dbToTableIdentifiers.entrySet()) { - Database db = entry.getKey(); - for (Map.Entry innerEntry : entry.getValue().entrySet()) { - TableIdentifier identifier = innerEntry.getKey(); - IcebergProperty icebergProperty = innerEntry.getValue(); - long tableId = SystemIdGenerator.getNextId(); - try { - // get doris table from iceberg - IcebergTable table = IcebergCatalogMgr.getTableFromIceberg(tableId, identifier.name(), - icebergProperty, identifier, false); - // check iceberg table if exists in doris database - if (!db.createTableWithLock(table, false, false).first) { - ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, table.getName()); - } - addTableCreationRecord(db.getId(), tableId, db.getFullName(), table.getName(), SUCCESS, - prop.writeTimeFormat(new Date(System.currentTimeMillis())), ""); - LOG.info("Successfully create table[{}-{}]", table.getName(), tableId); - } catch (Exception e) { - addTableCreationRecord(db.getId(), tableId, db.getFullName(), identifier.name(), FAIL, - prop.writeTimeFormat(new Date(System.currentTimeMillis())), e.getMessage()); - LOG.warn("Failed create table[{}], error: {}", identifier.name(), e.getMessage()); - } - } - } - removeDuplicateTables(); - } - - private void addTableCreationRecord(long dbId, long tableId, String db, String table, String status, - String createTime, String errorMsg) { - writeLock(); - try { - while (isQueueFull()) { - IcebergTableCreationRecord record = tableCreationRecordQueue.poll(); - if (record != null) { - Map tableRecords - = dbToTableToCreationRecord.get(record.getDbId()); - Iterator> tableRecordsIterator - = tableRecords.entrySet().iterator(); - while (tableRecordsIterator.hasNext()) { - long t = tableRecordsIterator.next().getKey(); - if (t == record.getTableId()) { - tableRecordsIterator.remove(); - break; - } - } - } - } - - IcebergTableCreationRecord record = new IcebergTableCreationRecord(dbId, tableId, db, table, status, - createTime, errorMsg); - tableCreationRecordQueue.offer(record); - - if (!dbToTableToCreationRecord.containsKey(dbId)) { - dbToTableToCreationRecord.put(dbId, new ConcurrentHashMap<>()); - } - Map tableToRecord = dbToTableToCreationRecord.get(dbId); - if (!tableToRecord.containsKey(tableId)) { - tableToRecord.put(tableId, record); - } - } finally { - writeUnlock(); - } - } - - public List getTableCreationRecordByDbId(long dbId) { - List records = new ArrayList<>(); - - readLock(); - try { - if (!dbToTableToCreationRecord.containsKey(dbId)) { - return records; - } - Map tableToRecords = dbToTableToCreationRecord.get(dbId); - for (Map.Entry entry : tableToRecords.entrySet()) { - records.add(entry.getValue()); - } - - return records; - } finally { - readUnlock(); - } - } - - class TableCreationComparator implements Comparator { - @Override - public int compare(IcebergTableCreationRecord r1, IcebergTableCreationRecord r2) { - return r1.getCreateTime().compareTo(r2.getCreateTime()); - } - } - - public boolean isQueueFull() { - return tableCreationRecordQueue.size() >= 2000; - } - - private void readLock() { - lock.readLock().lock(); - } - - private void readUnlock() { - lock.readLock().unlock(); - } - - private void writeLock() { - lock.writeLock().lock(); - } - - private void writeUnlock() { - lock.writeLock().unlock(); - } - -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java index 6b26616b90b19f..4c3ad20a3e1a1f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java @@ -21,7 +21,6 @@ import org.apache.doris.analysis.BinaryPredicate; import org.apache.doris.analysis.BoolLiteral; import org.apache.doris.analysis.CastExpr; -import org.apache.doris.analysis.ColumnDef; import org.apache.doris.analysis.CompoundPredicate; import org.apache.doris.analysis.DateLiteral; import org.apache.doris.analysis.DecimalLiteral; @@ -34,39 +33,18 @@ import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.Subquery; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Type; -import org.apache.doris.common.Config; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.UserException; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.thrift.TExprOpcode; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import org.apache.iceberg.CombinedScanTask; -import org.apache.iceberg.MetadataTableType; -import org.apache.iceberg.MetadataTableUtils; -import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.TableOperations; -import org.apache.iceberg.TableScan; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.transforms.PartitionSpecVisitor; -import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; /** * Iceberg utils @@ -81,147 +59,6 @@ public Integer initialValue() { }; static long MILLIS_TO_NANO_TIME = 1000; - /** - * Create Iceberg schema from Doris ColumnDef. - * - * @param columnDefs columns for create iceberg table - * @return Iceberg schema - * @throws UserException if has aggregate type in create table statement - */ - public static Schema createIcebergSchema(List columnDefs) throws UserException { - columnIdThreadLocal.set(1); - List nestedFields = Lists.newArrayList(); - for (ColumnDef columnDef : columnDefs) { - columnDef.analyze(false); - if (columnDef.getAggregateType() != null) { - throw new DdlException("Do not support aggregation column: " + columnDef.getName()); - } - boolean isNullable = columnDef.isAllowNull(); - org.apache.iceberg.types.Type icebergType = convertDorisToIceberg(columnDef.getType()); - if (isNullable) { - nestedFields.add( - Types.NestedField.optional(nextId(), columnDef.getName(), icebergType, columnDef.getComment())); - } else { - nestedFields.add( - Types.NestedField.required(nextId(), columnDef.getName(), icebergType, columnDef.getComment())); - } - } - return new Schema(nestedFields); - } - - public static List createSchemaFromIcebergSchema(Schema schema) throws DdlException { - List columns = Lists.newArrayList(); - for (Types.NestedField nestedField : schema.columns()) { - try { - columns.add(nestedFieldToColumn(nestedField)); - } catch (UnsupportedOperationException e) { - if (Config.iceberg_table_creation_strict_mode) { - throw e; - } - LOG.warn("Unsupported data type in Doris, ignore column[{}], with error: {}", - nestedField.name(), e.getMessage()); - continue; - } - } - return columns; - } - - public static Column nestedFieldToColumn(Types.NestedField field) { - Type type = convertIcebergToDoris(field.type()); - return new Column(field.name(), type, true, null, field.isOptional(), null, field.doc()); - } - - /** - * get iceberg table schema id to name mapping - * - * @param schema iceberg table schema - * @return id to name mapping - */ - public static Map getIdToName(Schema schema) { - Map idToName = new HashMap<>(); - for (Types.NestedField nestedField : schema.columns()) { - idToName.put(nestedField.fieldId(), nestedField.name()); - } - return idToName; - } - - public static List getIdentityPartitionField(PartitionSpec spec) { - return PartitionSpecVisitor.visit(spec, - new PartitionSpecVisitor() { - @Override - public String identity(String sourceName, int sourceId) { - return sourceName; - } - - @Override - public String bucket(String sourceName, int sourceId, int numBuckets) { - return null; - } - - @Override - public String truncate(String sourceName, int sourceId, int width) { - return null; - } - - @Override - public String year(String sourceName, int sourceId) { - return null; - } - - @Override - public String month(String sourceName, int sourceId) { - return null; - } - - @Override - public String day(String sourceName, int sourceId) { - return null; - } - - @Override - public String hour(String sourceName, int sourceId) { - return null; - } - - @Override - public String alwaysNull(int fieldId, String sourceName, int sourceId) { - return null; - } - - @Override - public String unknown(int fieldId, String sourceName, int sourceId, String transform) { - return null; - } - } - ).stream().filter(Objects::nonNull).collect(Collectors.toList()); - } - - /** - * Convert a {@link org.apache.iceberg.types.Type} to a {@link Type doris type}. - * - * @param type a iceberg Type - * @return the equivalent doris type - * @throws IllegalArgumentException if the type cannot be converted to doris - */ - public static Type convertIcebergToDoris(org.apache.iceberg.types.Type type) { - return TypeUtil.visit(type, new TypeToDorisType()); - } - - /** - * Convert a doris {@link Type struct} to a {@link org.apache.iceberg.types.Type} with new field ids. - *

- * This conversion assigns fresh ids. - *

- * Some data types are represented as the same doris type. These are converted to a default type. - * - * @param type a doris Type - * @return the equivalent Type - * @throws IllegalArgumentException if the type cannot be converted - */ - public static org.apache.iceberg.types.Type convertDorisToIceberg(Type type) { - return DorisTypeVisitor.visit(type, new DorisTypeToType()); - } - public static Expression convertToIcebergExpr(Expr expr, Schema schema) { if (expr == null) { return null; @@ -397,45 +234,4 @@ private static SlotRef convertDorisExprToSlotRef(Expr expr) { } return slotRef; } - - private static int findWidth(IntLiteral literal) { - Preconditions.checkArgument(literal.getValue() > 0 && literal.getValue() < Integer.MAX_VALUE, - "Unsupported width " + literal.getValue()); - return (int) literal.getValue(); - } - - public static int nextId() { - int nextId = columnIdThreadLocal.get(); - columnIdThreadLocal.set(nextId + 1); - return nextId; - } - - public static Set getAllDataFilesPath(org.apache.iceberg.Table table, TableOperations ops) { - org.apache.iceberg.Table dataFilesTable = MetadataTableUtils.createMetadataTableInstance( - ops, table.name(), table.name(), MetadataTableType.ALL_DATA_FILES); - - Set dataFilesPath = Sets.newHashSet(); - TableScan tableScan = dataFilesTable.newScan(); - List tasks = Lists.newArrayList(tableScan.planTasks()); - tasks.forEach(task -> - task.files().forEach(fileScanTask -> { - Lists.newArrayList(fileScanTask.asDataTask().rows()) - .forEach(row -> dataFilesPath.add(row.get(1, String.class))); - }) - ); - - return dataFilesPath; - } - - public static PartitionSpec buildPartitionSpec(Schema schema, List partitionNames) { - if (partitionNames == null || partitionNames.isEmpty()) { - return null; - } - PartitionSpec.Builder builder = PartitionSpec.builderFor(schema); - for (String partitionName : partitionNames) { - builder.identity(partitionName); - } - return builder.build(); - } - } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java index 0c2718925ed7dc..10e2b2b04ac5e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java @@ -50,14 +50,7 @@ public interface FileSystem { Status makeDir(String remotePath); - default RemoteFiles listLocatedFiles(String remotePath) throws UserException { - return listLocatedFiles(remotePath, false, false); - } - - // Get files and directories located status, not only files - default RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) throws UserException { - throw new UserException("Not support to listLocations."); - } + RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) throws UserException; // List files in remotePath // The remote file name will only contain file name only(Not full path) diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalFileSystem.java index d85273e55f08fa..1baaf9bd2f710c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalFileSystem.java @@ -59,6 +59,11 @@ public Status makeDir(String remotePath) { throw new UnsupportedOperationException("Unsupported operation on local file system."); } + @Override + public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) { + throw new UnsupportedOperationException("Unsupported operation on local file system."); + } + @Override public Status list(String remotePath, List result, boolean fileNameOnly) { throw new UnsupportedOperationException("Unsupported operation on local file system."); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java index 328247e814005e..5904033dec79b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFile.java @@ -45,10 +45,6 @@ public RemoteFile(String name, boolean isFile, long size, long blockSize, long m this(name, null, isFile, !isFile, size, blockSize, modificationTime, null); } - public RemoteFile(Path path, boolean isDirectory, long size, long blockSize, BlockLocation[] blockLocations) { - this(path.getName(), path, !isDirectory, isDirectory, size, blockSize, 0, blockLocations); - } - public RemoteFile(Path path, boolean isDirectory, long size, long blockSize, long modificationTime, BlockLocation[] blockLocations) { this(path.getName(), path, !isDirectory, isDirectory, size, blockSize, modificationTime, blockLocations); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index e1f79c6f6c5287..bb060c44fc5467 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -179,7 +179,6 @@ import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.datasource.HMSExternalCatalog; -import org.apache.doris.external.iceberg.IcebergTableCreationRecord; import org.apache.doris.job.task.AbstractTask; import org.apache.doris.load.DeleteHandler; import org.apache.doris.load.ExportJobState; @@ -2495,21 +2494,8 @@ public void handleShowSqlBlockRule() throws AnalysisException { private void handleShowTableCreation() throws AnalysisException { ShowTableCreationStmt showStmt = (ShowTableCreationStmt) stmt; - String dbName = showStmt.getDbName(); - DatabaseIf db = ctx.getCurrentCatalog().getDbOrAnalysisException(dbName); - - List records = ctx.getEnv().getIcebergTableCreationRecordMgr() - .getTableCreationRecordByDbId(db.getId()); List> rowSet = Lists.newArrayList(); - for (IcebergTableCreationRecord record : records) { - List row = record.getTableCreationRecord(); - // like predicate - if (Strings.isNullOrEmpty(showStmt.getWild()) || showStmt.like(record.getTable())) { - rowSet.add(row); - } - } - // sort function rows by fourth column (Create Time) asc ListComparator> comparator = null; OrderByPair orderByPair = new OrderByPair(3, false); diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java index da9891c4e3fa01..fb4dad862b65d8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java @@ -91,6 +91,7 @@ public static void beforeClass() throws Exception { Config.disable_balance = true; Config.schedule_batch_size = 400; Config.schedule_slot_num_per_hdd_path = 100; + Config.enable_odbc_mysql_broker_table = true; UtFrameUtils.createDorisClusterWithMultiTag(runningDir, 5); List backends = Env.getCurrentSystemInfo().getIdToBackend().values().asList(); @@ -251,7 +252,7 @@ public static void tearDown() { } private static void createTable(String sql) throws Exception { - Config.enable_odbc_table = true; + Config.enable_odbc_mysql_broker_table = true; CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext); Env.getCurrentEnv().createTable(createTableStmt); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateResourceStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateResourceStmtTest.java index 78defde01705ad..1cc0281d3f308e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateResourceStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateResourceStmtTest.java @@ -74,7 +74,7 @@ public void testNormal(@Mocked Env env, @Injectable AccessControllerManager acce properties = Maps.newHashMap(); properties.put("type", "odbc_catalog"); stmt = new CreateResourceStmt(true, false, resourceName2, properties); - Config.enable_odbc_table = true; + Config.enable_odbc_mysql_broker_table = true; stmt.analyze(analyzer); Assert.assertEquals(resourceName2, stmt.getResourceName()); Assert.assertEquals(Resource.ResourceType.ODBC_CATALOG, stmt.getResourceType()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java index d856599a3d7b9a..89b30f08a127fb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateTableStmtTest.java @@ -370,62 +370,6 @@ public void testBmpHllIncAgg() throws Exception { stmt.analyze(analyzer); } - @Test - public void testCreateIcebergTable() throws UserException { - Config.disable_iceberg_hudi_table = false; - Map properties = new HashMap<>(); - properties.put("iceberg.database", "doris"); - properties.put("iceberg.table", "test"); - properties.put("iceberg.hive.metastore.uris", "thrift://127.0.0.1:9087"); - CreateTableStmt stmt = new CreateTableStmt(false, true, tblName, "iceberg", properties, ""); - stmt.analyze(analyzer); - - Assert.assertEquals("CREATE EXTERNAL TABLE `testCluster:db1`.`table1` (\n" + "\n" + ") ENGINE = iceberg\n" - + "PROPERTIES (\"iceberg.database\" = \"doris\",\n" - + "\"iceberg.hive.metastore.uris\" = \"thrift://127.0.0.1:9087\",\n" - + "\"iceberg.table\" = \"test\")", stmt.toString()); - } - - @Test - public void testCreateHudiTable() throws UserException { - Config.disable_iceberg_hudi_table = false; - Map properties = new HashMap<>(); - properties.put("hudi.database", "doris"); - properties.put("hudi.table", "test"); - properties.put("hudi.hive.metastore.uris", "thrift://127.0.0.1:9087"); - CreateTableStmt stmt = new CreateTableStmt(false, true, tblName, "hudi", properties, ""); - stmt.analyze(analyzer); - - Assert.assertEquals("CREATE EXTERNAL TABLE `testCluster:db1`.`table1` (\n" + "\n" + ") ENGINE = hudi\n" - + "PROPERTIES (\"hudi.database\" = \"doris\",\n" - + "\"hudi.hive.metastore.uris\" = \"thrift://127.0.0.1:9087\",\n" - + "\"hudi.table\" = \"test\")", - stmt.toString()); - } - - @Test - public void testCreateHudiTableWithSchema() throws UserException { - Config.disable_iceberg_hudi_table = false; - Map properties = new HashMap<>(); - properties.put("hudi.database", "doris"); - properties.put("hudi.table", "test"); - properties.put("hudi.hive.metastore.uris", "thrift://127.0.0.1:9087"); - CreateTableStmt stmt = new CreateTableStmt(false, true, tblName, "hudi", properties, ""); - ColumnDef idCol = new ColumnDef("id", TypeDef.create(PrimitiveType.INT)); - stmt.addColumnDef(idCol); - ColumnDef nameCol = new ColumnDef("name", TypeDef.create(PrimitiveType.INT), false, null, true, - ColumnDef.DefaultValue.NOT_SET, ""); - stmt.addColumnDef(nameCol); - stmt.analyze(analyzer); - - Assert.assertEquals( - "CREATE EXTERNAL TABLE `testCluster:db1`.`table1` (\n" + " `id` INT NOT NULL COMMENT \"\",\n" - + " `name` INT NULL COMMENT \"\"\n" + ") ENGINE = hudi\n" - + "PROPERTIES (\"hudi.database\" = \"doris\",\n" - + "\"hudi.hive.metastore.uris\" = \"thrift://127.0.0.1:9087\",\n" - + "\"hudi.table\" = \"test\")", stmt.toString()); - } - @Test public void testOdbcString() throws AnalysisException { ColumnDef col = new ColumnDef("string_col", TypeDef.create(PrimitiveType.STRING), true, null, true, diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java index 7b82f640747d46..8a737a768378fc 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableLikeTest.java @@ -20,6 +20,7 @@ import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateTableLikeStmt; import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.ExceptionChecker; import org.apache.doris.qe.ConnectContext; @@ -51,6 +52,7 @@ public static void beforeClass() throws Exception { // create connect context connectContext = UtFrameUtils.createDefaultCtx(); + Config.enable_odbc_mysql_broker_table = true; // create database String createDbStmtStr = "create database test;"; CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/EnvOperationTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/EnvOperationTest.java index d9afb49eb20c93..d6936433d401c5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/EnvOperationTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/EnvOperationTest.java @@ -56,6 +56,7 @@ public static void beforeClass() throws Exception { String createDbStmtStr = "create database test;"; CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext); Env.getCurrentEnv().createDb(createDbStmt); + Config.enable_odbc_mysql_broker_table = true; createTable("create table test.renameTest\n" + "(k1 int,k2 int)\n" @@ -101,7 +102,6 @@ private static void createTable(String sql) throws Exception { } private static void createResource(String sql) throws Exception { - Config.enable_odbc_table = true; CreateResourceStmt createResourceStmt = (CreateResourceStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext); Env.getCurrentEnv().getResourceMgr().createResource(createResourceStmt); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/OdbcCatalogResourceTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/OdbcCatalogResourceTest.java index d2ecea5145d1fd..a1c2e679829162 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/OdbcCatalogResourceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/OdbcCatalogResourceTest.java @@ -87,7 +87,7 @@ public void testFromStmt(@Mocked Env env, @Injectable AccessControllerManager ac // host: 127.0.0.1, port: 7777, without driver and odbc_type CreateResourceStmt stmt = new CreateResourceStmt(true, false, name, properties); - Config.enable_odbc_table = true; + Config.enable_odbc_mysql_broker_table = true; stmt.analyze(analyzer); OdbcCatalogResource resource = (OdbcCatalogResource) Resource.fromStmt(stmt); Assert.assertEquals(name, resource.getName()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index ea402ca40bef8f..c17d38718eaeda 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -67,6 +67,7 @@ protected void runBeforeAll() throws Exception { // create database createDatabase("test"); connectContext.getSessionVariable().setEnableNereidsPlanner(false); + Config.enable_odbc_mysql_broker_table = true; createTable("create table test.test1\n" + "(\n" diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/StatisticDeriveTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/StatisticDeriveTest.java index 90283958bc09c7..359a99d27b757d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/StatisticDeriveTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/StatisticDeriveTest.java @@ -17,6 +17,7 @@ package org.apache.doris.planner; +import org.apache.doris.common.Config; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.utframe.TestWithFeService; @@ -27,6 +28,7 @@ public class StatisticDeriveTest extends TestWithFeService { @Override protected void runBeforeAll() throws Exception { + Config.enable_odbc_mysql_broker_table = true; // create database createDatabase("test"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 946e159529d22a..1400332911f0b6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -625,7 +625,7 @@ public void createTable(String sql) throws Exception { public void createTable(String sql, boolean enableNerieds) throws Exception { try { - Config.enable_odbc_table = true; + Config.enable_odbc_mysql_broker_table = true; createTables(enableNerieds, sql); } catch (Exception e) { e.printStackTrace(); diff --git a/regression-test/suites/correctness_p0/test_push_conjunct_external_table.groovy b/regression-test/suites/correctness_p0/test_push_conjunct_external_table.groovy index ad59c0f61bffa2..b4acbdfdaea110 100644 --- a/regression-test/suites/correctness_p0/test_push_conjunct_external_table.groovy +++ b/regression-test/suites/correctness_p0/test_push_conjunct_external_table.groovy @@ -18,6 +18,7 @@ suite("test_push_conjunct_external_table") { sql """set enable_nereids_planner=false;""" sql """ DROP TABLE IF EXISTS dim_server; """ + sql """ admin set frontend config("enable_odbc_mysql_broker_table" = "true")""" sql """ CREATE EXTERNAL TABLE `dim_server` (