diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala index 20e7ffbe1c5e0..478f5954c4de2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING import org.apache.hudi.common.table.timeline.TimelineUtils +import org.apache.hudi.common.util.StringUtils import org.apache.hudi.common.util.ValidationUtils.checkArgument import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hadoop.fs.HadoopFSUtils @@ -211,8 +212,10 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten // Save all the table config to the hoodie.properties. val properties = TypedProperties.fromMap(tableConfigs.asJava) + val databaseFromIdentifier = table.identifier.database.getOrElse( + spark.sessionState.catalog.getCurrentDatabase) val catalogDatabaseName = formatName(spark, - table.identifier.database.getOrElse(spark.sessionState.catalog.getCurrentDatabase)) + if (StringUtils.isNullOrEmpty(databaseFromIdentifier)) "default" else databaseFromIdentifier) val (recordName, namespace) = HoodieSchemaConversionUtils.getRecordNameAndNamespace(table.identifier.table) val schema = SchemaConverters.toAvroType(dataSchema, nullable = false, recordName, namespace) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala index 93e54cbf7beb9..cf29c75edacaf 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestCreateTable.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.hudi.ddl import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType, WriteOperationType} -import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.PartitionPathEncodeUtils.escapePathName import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat import org.apache.hudi.keygen.constant.KeyGeneratorType import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, StoragePath} @@ -1913,6 +1914,70 @@ class TestCreateTable extends HoodieSparkSqlTestBase { } } + test("Test Table Init Without Database Name Defaults And Reads Successfully") { + withTempDir { tmp => + val tablePath = tmp.getCanonicalPath + val tableName = generateTableName + + // 1. Init table WITHOUT database name using low-level API + HoodieTableMetaClient.newTableBuilder() + .setTableType(HoodieTableType.COPY_ON_WRITE) + .setTableName(tableName) + .setRecordKeyFields("id") + .setOrderingFields("ts") + // IMPORTANT: NOT calling .setDatabaseName() here. + .initTable(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf()), tablePath) + + // 2. Insert records. + val data = Seq( + (1, "Alice", 100L), + (2, "Bob", 200L), + (3, "Charlie", 300L) + ) + import spark.implicits._ + data.toDF("id", "name", "ts") + .write + .format("hudi") + .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts") + .option(HoodieWriteConfig.TBL_NAME.key, tableName) + .mode(SaveMode.Append) + .save(tablePath) + + // 3. Read back using Spark to validate read works. + val resultDf = spark.read.format("hudi").load(tablePath) + assertResult(3)(resultDf.count()) + } + } + + test("Test Create Table Via SQL Without Database Defaults To Default") { + withTempDir { tmp => + val tableName = generateTableName + + // Create table without specifying database (should default to current database) + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | ts long + |) using hudi + |tblproperties ( + | primaryKey = 'id', + | orderingFields = 'ts' + |) + |location '${tmp.getCanonicalPath}' + """.stripMargin) + + // Insert data + spark.sql(s"INSERT INTO $tableName VALUES (1, 'Alice', 100), (2, 'Bob', 200)") + + // Read back + val resultDf = spark.sql(s"SELECT * FROM $tableName") + assertResult(2)(resultDf.count()) + } + } + def writeAndValidateWithComplexKeyGenerator(spark: SparkSession, tableVersion: Int, tableName: String,