diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index a474b76a51814e..e5c8169155802c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -361,7 +361,7 @@ Status ColumnChunkReader::seek_to_nested_row(size_t } else { while (true) { RETURN_IF_ERROR(parse_page_header()); - if (_page_reader->is_header_v2()) { + if (_page_reader->is_header_v2() || !IN_COLLECTION) { if (_page_reader->start_row() <= left_row && left_row < _page_reader->end_row()) { RETURN_IF_ERROR(load_page_data()); // this page contain this row. @@ -447,11 +447,11 @@ Status ColumnChunkReader::load_page_nested_rows( *result_rows = 0; rep_levels.reserve(rep_levels.size() + _remaining_rep_nums); while (_remaining_rep_nums) { - level_t rep_level = _rep_level_decoder.get_next(); + level_t rep_level = _rep_level_get_next(); if (rep_level == 0) { // rep_level 0 indicates start of new row if (*result_rows == max_rows) { // this page contain max_rows, page no end. _current_row += max_rows; - _rep_level_decoder.rewind_one(); + _rep_level_rewind_one(); return Status::OK(); } (*result_rows)++; @@ -462,8 +462,8 @@ Status ColumnChunkReader::load_page_nested_rows( _current_row += *result_rows; auto need_check_cross_page = [&]() -> bool { - return !OFFSET_INDEX && _remaining_rep_nums == 0 && !_page_reader->is_header_v2() && - has_next_page(); + return !OFFSET_INDEX && IN_COLLECTION && _remaining_rep_nums == 0 && + !_page_reader->is_header_v2() && has_next_page(); }; *cross_page = need_check_cross_page(); return Status::OK(); @@ -478,10 +478,10 @@ Status ColumnChunkReader::load_cross_page_nested_ro *cross_page = has_next_page(); while (_remaining_rep_nums) { - level_t rep_level = _rep_level_decoder.get_next(); + level_t rep_level = _rep_level_get_next(); if (rep_level == 0) { // rep_level 0 indicates start of new row *cross_page = false; - _rep_level_decoder.rewind_one(); + _rep_level_rewind_one(); break; } _remaining_rep_nums--; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index 1270e5e37fcd1e..9e77a3139f60fb 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -206,6 +206,19 @@ class ColumnChunkReader { void _get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2, Slice& page_data); Status _skip_nested_rows_in_page(size_t num_rows); + level_t _rep_level_get_next() { + if constexpr (IN_COLLECTION) { + return _rep_level_decoder.get_next(); + } + return 0; + } + + void _rep_level_rewind_one() { + if constexpr (IN_COLLECTION) { + _rep_level_decoder.rewind_one(); + } + } + ColumnChunkReaderState _state = NOT_INIT; FieldSchema* _field_schema = nullptr; const level_t _max_rep_level; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 0917ca7cd06fb2..951cb69f91786b 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -118,8 +118,8 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx, element_reader, max_buf_size, col_offsets, true, column_ids, filter_column_ids)); - // element_reader->set_nested_column(); auto array_reader = ArrayColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx); + element_reader->set_column_in_nested(); RETURN_IF_ERROR(array_reader->init(std::move(element_reader), field)); array_reader->_filter_column_ids = filter_column_ids; reader.reset(array_reader.release()); @@ -133,7 +133,6 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx, key_reader, max_buf_size, col_offsets, true, column_ids, filter_column_ids)); - // key_reader->set_nested_column(); } else { auto skip_reader = std::make_unique(row_ranges, total_rows, ctz, io_ctx, &field->children[0]); @@ -146,7 +145,6 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, RETURN_IF_ERROR(create(file, &field->children[1], row_group, row_ranges, ctz, io_ctx, value_reader, max_buf_size, col_offsets, true, column_ids, filter_column_ids)); - // value_reader->set_nested_column(); } else { auto skip_reader = std::make_unique(row_ranges, total_rows, ctz, io_ctx, &field->children[0]); @@ -154,6 +152,8 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, } auto map_reader = MapColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx); + key_reader->set_column_in_nested(); + value_reader->set_column_in_nested(); RETURN_IF_ERROR(map_reader->init(std::move(key_reader), std::move(value_reader), field)); map_reader->_filter_column_ids = filter_column_ids; reader.reset(map_reader.release()); @@ -168,7 +168,6 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, RETURN_IF_ERROR(create(file, &child, row_group, row_ranges, ctz, io_ctx, child_reader, max_buf_size, col_offsets, in_collection, column_ids, filter_column_ids)); - // child_reader->set_nested_column(); child_readers[child.name] = std::move(child_reader); // Record the first non-SkippingReader if (non_skip_reader_idx == -1) { @@ -180,6 +179,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, skip_reader->_filter_column_ids = filter_column_ids; child_readers[child.name] = std::move(skip_reader); } + child_readers[child.name]->set_column_in_nested(); } // If all children are SkipReadingReader, force the first child to call create if (non_skip_reader_idx == -1) { @@ -187,7 +187,7 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx, child_reader, max_buf_size, col_offsets, in_collection, column_ids, filter_column_ids)); - // child_reader->set_nested_column(); + child_reader->set_column_in_nested(); child_readers[field->children[0].name] = std::move(child_reader); } auto struct_reader = StructColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx); @@ -201,8 +201,6 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, : nullptr; const tparquet::ColumnChunk& chunk = row_group.columns[physical_index]; - - // ScalarColumnReader::create_unique(row_ranges, total_rows, chunk, offset_index, ctz, io_ctx); if (in_collection) { if (offset_index == nullptr) { auto scalar_reader = ScalarColumnReader::create_unique( @@ -354,9 +352,6 @@ Status ScalarColumnReader::_read_values(size_t num_ return Status::InternalError("Failed to decode definition level."); } - for (int i = 0; i < loop_read; i++) { - _def_levels.emplace_back(def_level); - } bool is_null = def_level < _field_schema->definition_level; if (!(prev_is_null ^ is_null)) { null_map.emplace_back(0); @@ -371,14 +366,11 @@ Status ScalarColumnReader::_read_values(size_t num_ prev_is_null = is_null; has_read += loop_read; } - } else { - _def_levels.resize(_def_levels.size() + num_values, 0); } } else { if (_chunk_reader->max_def_level() > 0) { return Status::Corruption("Not nullable column has null values in parquet file"); } - _def_levels.resize(_def_levels.size() + num_values, 0); data_column = doris_column->assume_mutable(); } if (null_map.size() == 0) { @@ -560,7 +552,7 @@ Status ScalarColumnReader::read_column_data( _rep_levels.clear(); *read_rows = 0; - if constexpr (IN_COLLECTION) { + if (_in_nested) { RETURN_IF_ERROR(_read_nested_column(resolved_column, resolved_type, filter_map, batch_size, read_rows, eof, is_dict_filter)); return _converter->convert(resolved_column, _field_schema->data_type, type, doris_column, @@ -574,7 +566,6 @@ Status ScalarColumnReader::read_column_data( } else { right_row = _chunk_reader->page_end_row(); } - auto before_filter_map_index = _filter_map_index; do { // generate the row ranges that should be read @@ -641,18 +632,6 @@ Status ScalarColumnReader::read_column_data( } } - if (filter_map.has_filter()) { - size_t new_rep_sz = 0; - for (size_t idx = before_filter_map_index; idx < _filter_map_index; idx++) { - if (filter_map.filter_map_data()[idx]) { - _def_levels[new_rep_sz] = _def_levels[idx - before_filter_map_index]; - new_rep_sz++; - } - } - _def_levels.resize(new_rep_sz); - } - _rep_levels.resize(_def_levels.size(), 0); - return _converter->convert(resolved_column, _field_schema->data_type, type, doris_column, is_dict_filter); } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 4a49473a69fe26..1359c391336534 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -141,6 +141,7 @@ class ParquetColumnReader { virtual void reset_filter_map_index() = 0; FieldSchema* get_field_schema() const { return _field_schema; } + void set_column_in_nested() { _in_nested = true; } protected: void _generate_read_ranges(RowRange page_row_range, RowRanges* result_ranges) const; @@ -155,6 +156,10 @@ class ParquetColumnReader { size_t _filter_map_index = 0; std::set _filter_column_ids; + + // _in_nested: column in struct/map/array + // IN_COLLECTION : column in map/array + bool _in_nested = false; }; template