Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,6 @@ Status PushBrokerReader::_get_next_reader() {
const_cast<cctz::time_zone*>(&_runtime_state->timezone_obj()),
_io_ctx.get(), _runtime_state.get());

RETURN_IF_ERROR(parquet_reader->open());
std::vector<std::string> place_holder;
init_status = parquet_reader->init_reader(
_all_col_names, place_holder, _colname_to_value_range, _push_down_exprs,
Expand Down
7 changes: 6 additions & 1 deletion be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,6 @@ void PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
case TFileFormatType::FORMAT_AVRO: {
reader = vectorized::AvroJNIReader::create_unique(profile.get(), params, range,
file_slots);
st = ((vectorized::AvroJNIReader*)(reader.get()))->init_fetch_table_schema_reader();
break;
}
default:
Expand All @@ -893,6 +892,12 @@ void PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
st.to_protobuf(result->mutable_status());
return;
}
if (!st.ok()) {
LOG(WARNING) << "failed to create reader, errmsg=" << st;
st.to_protobuf(result->mutable_status());
return;
}
st = reader->init_schema_reader();
if (!st.ok()) {
LOG(WARNING) << "failed to init reader, errmsg=" << st;
st.to_protobuf(result->mutable_status());
Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/exec/format/avro/avro_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Status AvroJNIReader::get_columns(std::unordered_map<std::string, DataTypePtr>*
return Status::OK();
}

Status AvroJNIReader::init_fetch_table_reader(
Status AvroJNIReader::init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
std::ostringstream required_fields;
Expand Down Expand Up @@ -112,7 +112,8 @@ TFileType::type AvroJNIReader::get_file_type() const {
return type;
}

Status AvroJNIReader::init_fetch_table_schema_reader() {
// open the jni connector for parsing schema
Status AvroJNIReader::init_schema_reader() {
std::map<String, String> required_param = {{"uri", _range.path},
{"file_type", std::to_string(get_file_type())},
{"is_get_table_schema", "true"}};
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/format/avro/avro_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ class AvroJNIReader : public JniReader {
Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;

Status init_fetch_table_reader(
Status init_reader(
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);

TFileType::type get_file_type() const;

Status init_fetch_table_schema_reader();
Status init_schema_reader() override;

Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) override;
Expand Down
77 changes: 37 additions & 40 deletions be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,14 +392,45 @@ Status CsvReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name
return Status::OK();
}

// init decompressor, file reader and line reader for parsing schema
Status CsvReader::init_schema_reader() {
_start_offset = _range.start_offset;
if (_start_offset != 0) {
return Status::InvalidArgument(
"start offset of TFileRangeDesc must be zero in get parsered schema");
}
if (_params.file_type == TFileType::FILE_BROKER) {
return Status::InternalError<false>(
"Getting parsered schema from csv file do not support stream load and broker "
"load.");
}

// csv file without names line and types line.
_read_line = 1;
_is_parse_name = false;

if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type &&
!_params.file_attributes.header_type.empty()) {
std::string header_type = to_lower(_params.file_attributes.header_type);
if (header_type == BeConsts::CSV_WITH_NAMES) {
_is_parse_name = true;
} else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
_read_line = 2;
_is_parse_name = true;
}
}

RETURN_IF_ERROR(_init_options());
RETURN_IF_ERROR(_create_file_reader(true));
RETURN_IF_ERROR(_create_decompressor());
RETURN_IF_ERROR(_create_line_reader());
return Status::OK();
}

Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) {
size_t read_line = 0;
bool is_parse_name = false;
RETURN_IF_ERROR(_prepare_parse(&read_line, &is_parse_name));

if (read_line == 1) {
if (!is_parse_name) { //parse csv file without names and types
if (_read_line == 1) {
if (!_is_parse_name) { //parse csv file without names and types
size_t col_nums = 0;
RETURN_IF_ERROR(_parse_col_nums(&col_nums));
for (size_t i = 0; i < col_nums; ++i) {
Expand Down Expand Up @@ -705,40 +736,6 @@ void CsvReader::_split_line(const Slice& line) {
_fields_splitter->split_line(line, &_split_values);
}

Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) {
_start_offset = _range.start_offset;
if (_start_offset != 0) {
return Status::InvalidArgument(
"start offset of TFileRangeDesc must be zero in get parsered schema");
}
if (_params.file_type == TFileType::FILE_BROKER) {
return Status::InternalError<false>(
"Getting parsered schema from csv file do not support stream load and broker "
"load.");
}

// csv file without names line and types line.
*read_line = 1;
*is_parse_name = false;

if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type &&
!_params.file_attributes.header_type.empty()) {
std::string header_type = to_lower(_params.file_attributes.header_type);
if (header_type == BeConsts::CSV_WITH_NAMES) {
*is_parse_name = true;
} else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
*read_line = 2;
*is_parse_name = true;
}
}

RETURN_IF_ERROR(_init_options());
RETURN_IF_ERROR(_create_file_reader(true));
RETURN_IF_ERROR(_create_decompressor());
RETURN_IF_ERROR(_create_line_reader());
return Status::OK();
}

Status CsvReader::_parse_col_nums(size_t* col_nums) {
const uint8_t* ptr = nullptr;
size_t size = 0;
Expand Down
7 changes: 4 additions & 3 deletions be/src/vec/exec/format/csv/csv_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ class CsvReader : public GenericReader {
Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;

Status init_schema_reader() override;
// get schema of csv file from first one line or first two lines.
// if file format is FORMAT_CSV_DEFLATE and if
// 1. header_type is empty, get schema from first line.
Expand Down Expand Up @@ -230,9 +231,6 @@ class CsvReader : public GenericReader {
void _init_system_properties();
void _init_file_description();

// used for parse table schema of csv file.
// Currently, this feature is for table valued function.
Status _prepare_parse(size_t* read_line, bool* is_parse_name);
Status _parse_col_nums(size_t* col_nums);
Status _parse_col_names(std::vector<std::string>* col_names);
// TODO(ftw): parse type
Expand Down Expand Up @@ -262,6 +260,9 @@ class CsvReader : public GenericReader {
// True if this is a load task
bool _is_load = false;
bool _line_reader_eof;
// For schema reader
size_t _read_line = 0;
bool _is_parse_name = false;
TFileFormatType::type _file_format_type;
bool _is_proto_format;
TFileCompressType::type _file_compress_type;
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/exec/format/generic_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ class GenericReader : public ProfileCollector {
return Status::NotSupported("get_columns is not implemented");
}

// This method is responsible for initializing the resource for parsing schema.
// It will be called before `get_parsed_schema`.
virtual Status init_schema_reader() {
return Status::NotSupported("init_schema_reader is not implemented for this reader.");
}
// `col_types` is always nullable to process illegal values.
virtual Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) {
Expand Down
13 changes: 9 additions & 4 deletions be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,18 +257,23 @@ Status NewJsonReader::get_columns(std::unordered_map<std::string, DataTypePtr>*
return Status::OK();
}

Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) {
// init decompressor, file reader and line reader for parsing schema
Status NewJsonReader::init_schema_reader() {
RETURN_IF_ERROR(_get_range_params());

// create decompressor.
// _decompressor may be nullptr if this is not a compressed file
RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor));
RETURN_IF_ERROR(_open_file_reader(true));
if (_read_json_by_line) {
RETURN_IF_ERROR(_open_line_reader());
}

// generate _parsed_jsonpaths and _parsed_json_root
RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
return Status::OK();
}

Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) {
bool eof = false;
const uint8_t* json_str = nullptr;
std::unique_ptr<uint8_t[]> json_str_ptr;
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/json/new_json_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class NewJsonReader : public GenericReader {
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;
Status init_schema_reader() override;
Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) override;

Expand Down
7 changes: 5 additions & 2 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,13 @@ Status OrcReader::init_reader(
return Status::OK();
}

// init file reader for parsing schema
Status OrcReader::init_schema_reader() {
return _create_file_reader();
}

Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) {
RETURN_IF_ERROR(_create_file_reader());
const auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType();
for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
col_names->emplace_back(get_field_name_lower_case(&root_type, i));
Expand All @@ -374,7 +378,6 @@ Status OrcReader::get_schema_col_name_attribute(std::vector<std::string>* col_na
std::vector<int32_t>* col_attributes,
const std::string& attribute,
bool* exist_attribute) {
RETURN_IF_ERROR(_create_file_reader());
*exist_attribute = true;
const auto& root_type = _reader->getType();
for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ class OrcReader : public GenericReader {
Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;

Status init_schema_reader() override;

Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) override;

Expand Down
22 changes: 9 additions & 13 deletions be/src/vec/exec/format/parquet/vparquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,12 +261,6 @@ const FieldDescriptor ParquetReader::get_file_metadata_schema() {
return _file_metadata->schema();
}

Status ParquetReader::open() {
RETURN_IF_ERROR(_open_file());
_t_metadata = &(_file_metadata->to_thrift());
return Status::OK();
}

void ParquetReader::_init_system_properties() {
if (_scan_range.__isset.file_type) {
// for compatibility
Expand Down Expand Up @@ -313,10 +307,8 @@ Status ParquetReader::init_reader(
_slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
_colname_to_value_range = colname_to_value_range;
_hive_use_column_names = hive_use_column_names;
if (_file_metadata == nullptr) {
return Status::InternalError("failed to init parquet reader, please open reader first");
}

RETURN_IF_ERROR(_open_file());
_t_metadata = &(_file_metadata->to_thrift());
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
_total_groups = _t_metadata->row_groups.size();
if (_total_groups == 0) {
Expand Down Expand Up @@ -496,11 +488,15 @@ Status ParquetReader::set_fill_columns(
return Status::OK();
}

Status ParquetReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) {
// init file reader and file metadata for parsing schema
Status ParquetReader::init_schema_reader() {
RETURN_IF_ERROR(_open_file());
_t_metadata = &_file_metadata->to_thrift();
_t_metadata = &(_file_metadata->to_thrift());
return Status::OK();
}

Status ParquetReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) {
_total_groups = _t_metadata->row_groups.size();
auto schema_desc = _file_metadata->schema();
for (int i = 0; i < schema_desc.size(); ++i) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/format/parquet/vparquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ class ParquetReader : public GenericReader {
// for test
void set_file_reader(io::FileReaderSPtr file_reader) { _file_reader = file_reader; }

Status open();

Status init_reader(
const std::vector<std::string>& all_column_names,
const std::vector<std::string>& missing_column_names,
Expand All @@ -133,6 +131,8 @@ class ParquetReader : public GenericReader {
Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;

Status init_schema_reader() override;

Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) override;

Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,14 @@ Status IcebergTableReader::_equality_delete_base(
delete_desc.file_size = -1;
std::unique_ptr<GenericReader> delete_reader = _create_equality_reader(delete_desc);
if (!init_schema) {
RETURN_IF_ERROR(delete_reader->init_schema_reader());
RETURN_IF_ERROR(delete_reader->get_parsed_schema(&equality_delete_col_names,
&equality_delete_col_types));
_generate_equality_delete_block(&_equality_delete_block, equality_delete_col_names,
equality_delete_col_types);
init_schema = true;
}
if (auto* parquet_reader = typeid_cast<ParquetReader*>(delete_reader.get())) {
RETURN_IF_ERROR(parquet_reader->open());
RETURN_IF_ERROR(parquet_reader->init_reader(equality_delete_col_names,
not_in_file_col_names, nullptr, {}, nullptr,
nullptr, nullptr, nullptr, nullptr, false));
Expand Down Expand Up @@ -444,8 +444,6 @@ Status IcebergParquetReader ::_read_position_delete_file(const TFileRangeDesc* d
ParquetReader parquet_delete_reader(
_profile, _params, *delete_range, READ_DELETE_FILE_BATCH_SIZE,
const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx, _state);

RETURN_IF_ERROR(parquet_delete_reader.open());
RETURN_IF_ERROR(parquet_delete_reader.init_reader(delete_file_col_names, {}, nullptr, {},
nullptr, nullptr, nullptr, nullptr, nullptr,
false));
Expand Down Expand Up @@ -540,6 +538,7 @@ Status IcebergOrcReader::_read_position_delete_file(const TFileRangeDesc* delete
Status IcebergParquetReader::get_file_col_id_to_name(
bool& exist_schema, std::map<int32_t, std::string>& file_col_id_to_name) {
auto* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get());
RETURN_IF_ERROR(parquet_reader->init_schema_reader());
FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema();

if (field_desc.has_parquet_field_id()) {
Expand All @@ -559,6 +558,7 @@ Status IcebergOrcReader::get_file_col_id_to_name(

std::vector<std::string> col_names;
std::vector<int32_t> col_ids;
RETURN_IF_ERROR(orc_reader->init_schema_reader());
RETURN_IF_ERROR(orc_reader->get_schema_col_name_attribute(
&col_names, &col_ids, ICEBERG_ORC_ATTRIBUTE, &exist_schema));
if (!exist_schema) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/format/wal/wal_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ namespace doris::vectorized {
#include "common/compile_check_begin.h"
struct ScannerCounter;
class WalReader : public GenericReader {
ENABLE_FACTORY_CREATOR(WalReader);

public:
WalReader(RuntimeState* state);
~WalReader() override = default;
Expand Down
Loading
Loading