From b088e2d2c06f86ae7d955ea98672373ee54cff9f Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 8 May 2024 13:14:52 +0800 Subject: [PATCH 1/5] fix --- be/src/vec/runtime/vorc_transformer.cpp | 12 ++++++++---- be/src/vec/sink/writer/vhive_partition_writer.cpp | 10 ++++++++++ 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/be/src/vec/runtime/vorc_transformer.cpp b/be/src/vec/runtime/vorc_transformer.cpp index 3bf3d8cea70267..10fece59d4fd19 100644 --- a/be/src/vec/runtime/vorc_transformer.cpp +++ b/be/src/vec/runtime/vorc_transformer.cpp @@ -91,7 +91,10 @@ void VOrcOutputStream::write(const void* data, size_t length) { Status st = _file_writer->append({static_cast(data), length}); if (!st.ok()) { LOG(WARNING) << "Write to ORC file failed: " << st; - return; + // When a write error occurs, + // the error needs to be thrown to the upper layer. + // so that fe can get the exception. + throw std::runtime_error(st.to_string()); } _cur_pos += length; _written_len += length; @@ -148,9 +151,10 @@ Status VOrcTransformer::open() { } _output_stream = std::make_unique(_file_writer); - _writer = orc::createWriter(*_schema, _output_stream.get(), *_write_options); - if (_writer == nullptr) { - return Status::InternalError("Failed to create file writer"); + try { + _writer = orc::createWriter(*_schema, _output_stream.get(), *_write_options); + } catch (const std::exception& e) { + return Status::InternalError("failed to create writer: {}", e.what()); } return Status::OK(); } diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp b/be/src/vec/sink/writer/vhive_partition_writer.cpp index 1e1faef7e8070e..dbaef030ad88cf 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.cpp +++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp @@ -57,6 +57,16 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) fs_properties.properties = &_hadoop_conf; io::FileDescription file_description = { .path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name())}; + // If the destination path contains a schema, use the schema directly. + // If not, use defaultFS. + // Otherwise a write error will occur. + string::size_type idx = file_description.path.find("://"); + if (idx != string::npos) { + idx = file_description.path.find("/", idx + 3); + if (idx != string::npos) { + file_description.fs_name = file_description.path.substr(0, idx); + } + } _fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description)); io::FileWriterOptions file_writer_options = {.used_by_s3_committer = true}; RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer, &file_writer_options)); From 5442e287e5a856905d68b7596bd386b2823020e2 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 8 May 2024 13:24:23 +0800 Subject: [PATCH 2/5] format --- be/src/vec/runtime/vorc_transformer.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/vec/runtime/vorc_transformer.cpp b/be/src/vec/runtime/vorc_transformer.cpp index 10fece59d4fd19..5ea7ec860fb38d 100644 --- a/be/src/vec/runtime/vorc_transformer.cpp +++ b/be/src/vec/runtime/vorc_transformer.cpp @@ -91,8 +91,8 @@ void VOrcOutputStream::write(const void* data, size_t length) { Status st = _file_writer->append({static_cast(data), length}); if (!st.ok()) { LOG(WARNING) << "Write to ORC file failed: " << st; - // When a write error occurs, - // the error needs to be thrown to the upper layer. + // When a write error occurs, + // the error needs to be thrown to the upper layer. // so that fe can get the exception. throw std::runtime_error(st.to_string()); } From 1ee2a0bc91ffe4b945fa45a7f64e6059fd47fd8c Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 8 May 2024 15:33:38 +0800 Subject: [PATCH 3/5] add test --- .../write/test_hive_write_different_path.out | 11 ++ .../test_hive_write_different_path.groovy | 102 ++++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 regression-test/data/external_table_p0/hive/write/test_hive_write_different_path.out create mode 100644 regression-test/suites/external_table_p0/hive/write/test_hive_write_different_path.groovy diff --git a/regression-test/data/external_table_p0/hive/write/test_hive_write_different_path.out b/regression-test/data/external_table_p0/hive/write/test_hive_write_different_path.out new file mode 100644 index 00000000000000..b007d074a1fef4 --- /dev/null +++ b/regression-test/data/external_table_p0/hive/write/test_hive_write_different_path.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q01 -- +1 a a 1 +2 b a 1 +3 c a 1 + +-- !q02 -- +4 d a 1 +5 e a 1 +6 f a 1 + diff --git a/regression-test/suites/external_table_p0/hive/write/test_hive_write_different_path.groovy b/regression-test/suites/external_table_p0/hive/write/test_hive_write_different_path.groovy new file mode 100644 index 00000000000000..744e474539c35c --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/write/test_hive_write_different_path.groovy @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_write_different_path", "p0,external,hive,external_docker,external_docker_hive") { + + for (String hivePrefix : ["hive2", "hive3"]) { + + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("diable Hive test.") + return; + } + + setHivePrefix(hivePrefix) + try { + String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + String catalog1 = "test_${hivePrefix}_write_insert_without_defaultfs" + String catalog2 = "test_${hivePrefix}_write_insert_with_external_ip" + String catalog3 = "test_${hivePrefix}_write_insert_with_local_ip" + String localEnvIp = "127.0.0.1" + + sql """drop catalog if exists ${catalog1}""" + sql """drop catalog if exists ${catalog2}""" + sql """drop catalog if exists ${catalog3}""" + sql """create catalog if not exists ${catalog1} properties ( + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}' + );""" + + sql """ use ${catalog1}.write_test """ + sql """ drop table if exists tb_with_external_ip """ + sql """ drop table if exists tb_with_local_ip """ + + sql """ + CREATE TABLE `tb_with_external_ip` + ( + `col_bigint_undef_signed` BIGINT NULL, + `col_varchar_10__undef_signed` VARCHAR(10) NULL, + `col_varchar_64__undef_signed` VARCHAR(64) NULL, + `pk` INT NULL ) properties ( + 'location' = 'hdfs://${externalEnvIp}:${hdfs_port}/user/hive/warehouse/write_test.db/tb_with_external_ip/' + ); + """ + + sql """ + CREATE TABLE `tb_with_local_ip` + ( + `col_bigint_undef_signed` BIGINT NULL, + `col_varchar_10__undef_signed` VARCHAR(10) NULL, + `col_varchar_64__undef_signed` VARCHAR(64) NULL, + `pk` INT NULL ) properties ( + 'location' = 'hdfs://${localEnvIp}:${hdfs_port}/user/hive/warehouse/write_test.db/tb_with_local_ip/' + ); + """ + + sql """create catalog if not exists ${catalog2} properties ( + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', + 'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}' + );""" + sql """create catalog if not exists ${catalog3} properties ( + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}', + 'fs.defaultFS' = 'hdfs://${localEnvIp}:${hdfs_port}' + );""" + + sql """ insert into ${catalog1}.write_test.tb_with_external_ip values (1,'a','a',1) """ + sql """ insert into ${catalog2}.write_test.tb_with_external_ip values (2,'b','a',1) """ + sql """ insert into ${catalog3}.write_test.tb_with_external_ip values (3,'c','a',1) """ + sql """ insert into ${catalog1}.write_test.tb_with_local_ip values (4,'d','a',1) """ + sql """ insert into ${catalog2}.write_test.tb_with_local_ip values (5,'e','a',1) """ + sql """ insert into ${catalog3}.write_test.tb_with_local_ip values (6,'f','a',1) """ + + qt_q01 """ select * from ${catalog1}.write_test.tb_with_external_ip order by col_bigint_undef_signed """ + qt_q02 """ select * from ${catalog1}.write_test.tb_with_local_ip order by col_bigint_undef_signed """ + + sql """drop catalog if exists ${catalog1}""" + sql """drop catalog if exists ${catalog2}""" + sql """drop catalog if exists ${catalog3}""" + + } finally { + } + } +} + From ec8f8fe8e5a188e9b877b92eda0bea4b29efeb15 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 8 May 2024 18:37:53 +0800 Subject: [PATCH 4/5] fix --- .../hive/write/test_hive_write_different_path.out | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/regression-test/data/external_table_p0/hive/write/test_hive_write_different_path.out b/regression-test/data/external_table_p0/hive/write/test_hive_write_different_path.out index b007d074a1fef4..626a46763ea3a3 100644 --- a/regression-test/data/external_table_p0/hive/write/test_hive_write_different_path.out +++ b/regression-test/data/external_table_p0/hive/write/test_hive_write_different_path.out @@ -9,3 +9,13 @@ 5 e a 1 6 f a 1 +-- !q01 -- +1 a a 1 +2 b a 1 +3 c a 1 + +-- !q02 -- +4 d a 1 +5 e a 1 +6 f a 1 + From 5f68d81dbf32bb0cb8d32193e8d9b742e49181c4 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Tue, 14 May 2024 10:52:17 +0800 Subject: [PATCH 5/5] add comment --- be/src/vec/sink/writer/vhive_partition_writer.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp b/be/src/vec/sink/writer/vhive_partition_writer.cpp index dbaef030ad88cf..f0e4a168326559 100644 --- a/be/src/vec/sink/writer/vhive_partition_writer.cpp +++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp @@ -60,6 +60,9 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile) // If the destination path contains a schema, use the schema directly. // If not, use defaultFS. // Otherwise a write error will occur. + // example: + // hdfs://host:port/path1/path2 --> hdfs://host:port + // hdfs://nameservice/path1/path2 --> hdfs://nameservice string::size_type idx = file_description.path.find("://"); if (idx != string::npos) { idx = file_description.path.find("/", idx + 3);