Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING
import org.apache.hudi.common.table.timeline.TimelineUtils
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hadoop.fs.HadoopFSUtils
Expand Down Expand Up @@ -211,8 +212,10 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
// Save all the table config to the hoodie.properties.
val properties = TypedProperties.fromMap(tableConfigs.asJava)

val databaseFromIdentifier = table.identifier.database.getOrElse(
spark.sessionState.catalog.getCurrentDatabase)
val catalogDatabaseName = formatName(spark,
table.identifier.database.getOrElse(spark.sessionState.catalog.getCurrentDatabase))
if (StringUtils.isNullOrEmpty(databaseFromIdentifier)) "default" else databaseFromIdentifier)

val (recordName, namespace) = HoodieSchemaConversionUtils.getRecordNameAndNamespace(table.identifier.table)
val schema = SchemaConverters.toAvroType(dataSchema, nullable = false, recordName, namespace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ package org.apache.spark.sql.hudi.ddl
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.PartitionPathEncodeUtils.escapePathName
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
import org.apache.hudi.keygen.constant.KeyGeneratorType
import org.apache.hudi.storage.{HoodieStorage, HoodieStorageUtils, StoragePath}
Expand Down Expand Up @@ -1913,6 +1914,70 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
}
}

test("Test Table Init Without Database Name Defaults And Reads Successfully") {
withTempDir { tmp =>
val tablePath = tmp.getCanonicalPath
val tableName = generateTableName

// 1. Init table WITHOUT database name using low-level API
HoodieTableMetaClient.newTableBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(tableName)
.setRecordKeyFields("id")
.setOrderingFields("ts")
// IMPORTANT: NOT calling .setDatabaseName() here.
.initTable(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf()), tablePath)

// 2. Insert records.
val data = Seq(
(1, "Alice", 100L),
(2, "Bob", 200L),
(3, "Charlie", 300L)
)
import spark.implicits._
data.toDF("id", "name", "ts")
.write
.format("hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
.mode(SaveMode.Append)
.save(tablePath)

// 3. Read back using Spark to validate read works.
val resultDf = spark.read.format("hudi").load(tablePath)
assertResult(3)(resultDf.count())
}
}

test("Test Create Table Via SQL Without Database Defaults To Default") {
withTempDir { tmp =>
val tableName = generateTableName

// Create table without specifying database (should default to current database)
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| ts long
|) using hudi
|tblproperties (
| primaryKey = 'id',
| orderingFields = 'ts'
|)
|location '${tmp.getCanonicalPath}'
""".stripMargin)

// Insert data
spark.sql(s"INSERT INTO $tableName VALUES (1, 'Alice', 100), (2, 'Bob', 200)")

// Read back
val resultDf = spark.sql(s"SELECT * FROM $tableName")
assertResult(2)(resultDf.count())
}
}

def writeAndValidateWithComplexKeyGenerator(spark: SparkSession,
tableVersion: Int,
tableName: String,
Expand Down
Loading