Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,16 @@ public void finishInsertTable(NameMapping nameMapping) {
case NEW:
case OVERWRITE:
StorageDescriptor sd = table.getSd();
// For object storage (FILE_S3), use writePath to keep original scheme (oss://, cos://)
// For HDFS, use targetPath which is the final path after rename
String pathForHMS = this.fileType == TFileType.FILE_S3
? writePath
: pu.getLocation().getTargetPath();
HivePartition hivePartition = new HivePartition(
nameMapping,
false,
sd.getInputFormat(),
pu.getLocation().getTargetPath(),
pathForHMS,
HiveUtil.toPartitionValues(pu.getName()),
Maps.newHashMap(),
sd.getOutputFormat(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,106 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") {
assert dropResult.size() == 0
}

/*--------test partition table insert---------*/
def testPartitionTableInsert = { String catalogProperties, String prefix, String dbLocation ->
def catalog_name = "${prefix}_catalog"
sql """
DROP CATALOG IF EXISTS ${catalog_name};
"""
sql """
CREATE CATALOG IF NOT EXISTS ${catalog_name} PROPERTIES (
${catalogProperties}
);
"""
sql """
switch ${catalog_name};
"""

def db_name = prefix + "_db" + System.currentTimeMillis() + ThreadLocalRandom.current().nextInt(1000)
sql """
DROP DATABASE IF EXISTS ${db_name} FORCE;
"""
sql """
CREATE DATABASE IF NOT EXISTS ${db_name}
PROPERTIES ('location'='${dbLocation}');
"""

def dbResult = sql """
show databases like "${db_name}";
"""
assert dbResult.size() == 1

sql """
use ${db_name};
"""

def table_name = prefix + ThreadLocalRandom.current().nextInt(1000) + "_partition_table"

// Create partitioned table
sql """
CREATE TABLE ${table_name} (
id INT COMMENT 'id',
name VARCHAR(20) COMMENT 'name',
age INT COMMENT 'age',
pt1 VARCHAR(20) COMMENT 'partition key'
) ENGINE=hive
PARTITION BY LIST (pt1) ()
PROPERTIES (
'file_format'='orc',
'compression'='zlib'
);
"""

// Test 1: Insert into new partition
sql """
insert into ${table_name} values (1, 'alice', 20, 'p1');
"""
def result1 = sql """
SELECT * FROM ${table_name} ORDER BY id;
"""
assert result1.size() == 1
assert result1[0][0] == 1

// Test 2: Insert into existing partition (APPEND mode)
sql """
insert into ${table_name} values (2, 'bob', 25, 'p1');
"""
def result2 = sql """
SELECT * FROM ${table_name} WHERE pt1='p1' ORDER BY id;
"""
assert result2.size() == 2

// Test 3: Insert into another new partition
sql """
insert into ${table_name} values (3, 'charlie', 30, 'p2');
"""
def result3 = sql """
SELECT * FROM ${table_name} ORDER BY id;
"""
assert result3.size() == 3

// Test 4: Multiple inserts to verify scheme consistency
sql """
insert into ${table_name} values (4, 'david', 35, 'p1'), (5, 'eve', 28, 'p2');
"""
def result4 = sql """
SELECT COUNT(*) FROM ${table_name};
"""
assert result4[0][0] == 5

sql """
DROP TABLE ${table_name};
"""
sql """
DROP DATABASE ${db_name} FORCE;
"""

def dropResult = sql """
show databases like "${db_name}";
"""
assert dropResult.size() == 0
}

/*--------only execute query---------*/
def testQuery = { String catalog_properties, String prefix, String db_name, String table_name, int data_count ->

Expand Down Expand Up @@ -267,6 +367,10 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") {
testQueryAndInsert(hms_properties + obs_region_param + obs_storage_properties, "hive_hms_obs_test_region", db_location)
testQueryAndInsert(hms_type_properties + hms_kerberos_old_prop + obs_storage_properties, "hive_hms_on_obs_kerberos_old", db_location)
testQueryAndInsert(hms_type_properties + hms_kerberos_new_prop + obs_storage_properties, "hive_hms_on_obs_kerberos_new", db_location)

//OBS - Partition table tests
db_location = "obs://${obs_parent_path}/hive/hms/partition/" + System.currentTimeMillis()
testPartitionTableInsert(hms_properties + obs_storage_properties, "hive_hms_obs_partition_test", db_location)
//GCS
if(context.config.otherConfigs.get("enableGCS")){
db_location = "gs://${gcs_parent_path}/hive/hms/" + System.currentTimeMillis()
Expand All @@ -283,6 +387,10 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") {
testQueryAndInsert(hms_type_properties + hms_kerberos_old_prop + cos_storage_properties, "hive_hms_on_cos_kerberos_old", db_location)
testQueryAndInsert(hms_type_properties + hms_kerberos_new_prop + cos_storage_properties, "hive_hms_on_cos_kerberos_new", db_location)

//COS - Partition table tests
db_location = "cosn://${cos_parent_path}/hive/hms/partition/" + System.currentTimeMillis()
testPartitionTableInsert(hms_properties + cos_storage_properties, "hive_hms_cos_partition_test", db_location)

db_location = "cos://${cos_parent_path}/hive/hms/" + System.currentTimeMillis()
testQueryAndInsert(hms_properties + cos_storage_properties, "hive_hms_cos_test", db_location)

Expand All @@ -293,6 +401,11 @@ suite("hive_on_hms_and_dlf", "p2,external,new_catalog_property") {
testQueryAndInsert(hms_type_properties + hms_kerberos_old_prop + oss_storage_properties, "hive_hms_on_oss_kerberos_old", db_location)
testQueryAndInsert(hms_type_properties + hms_kerberos_new_prop + oss_storage_properties, "hive_hms_on_oss_kerberos_new", db_location)

//OSS - Partition table tests (fix for partition path scheme mismatch)
db_location = "oss://${oss_parent_path}/hive/hms/partition/" + System.currentTimeMillis()
testPartitionTableInsert(hms_properties + oss_storage_properties, "hive_hms_oss_partition_test", db_location)
testPartitionTableInsert(hms_properties + oss_region_param + oss_storage_properties, "hive_hms_oss_partition_test_region", db_location)

//s3
db_location = "s3a://${s3_parent_path}/hive/hms/"+System.currentTimeMillis()
testQueryAndInsert(hms_properties + s3_storage_properties, "hive_hms_s3_test", db_location)
Expand Down
Loading