Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::seek_to_nested_row(size_t
} else {
while (true) {
RETURN_IF_ERROR(parse_page_header());
if (_page_reader->is_header_v2()) {
if (_page_reader->is_header_v2() || !IN_COLLECTION) {
if (_page_reader->start_row() <= left_row && left_row < _page_reader->end_row()) {
RETURN_IF_ERROR(load_page_data());
// this page contain this row.
Expand Down Expand Up @@ -447,11 +447,11 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::load_page_nested_rows(
*result_rows = 0;
rep_levels.reserve(rep_levels.size() + _remaining_rep_nums);
while (_remaining_rep_nums) {
level_t rep_level = _rep_level_decoder.get_next();
level_t rep_level = _rep_level_get_next();
if (rep_level == 0) { // rep_level 0 indicates start of new row
if (*result_rows == max_rows) { // this page contain max_rows, page no end.
_current_row += max_rows;
_rep_level_decoder.rewind_one();
_rep_level_rewind_one();
return Status::OK();
}
(*result_rows)++;
Expand All @@ -462,8 +462,8 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::load_page_nested_rows(
_current_row += *result_rows;

auto need_check_cross_page = [&]() -> bool {
return !OFFSET_INDEX && _remaining_rep_nums == 0 && !_page_reader->is_header_v2() &&
has_next_page();
return !OFFSET_INDEX && IN_COLLECTION && _remaining_rep_nums == 0 &&
!_page_reader->is_header_v2() && has_next_page();
};
*cross_page = need_check_cross_page();
return Status::OK();
Expand All @@ -478,10 +478,10 @@ Status ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>::load_cross_page_nested_ro

*cross_page = has_next_page();
while (_remaining_rep_nums) {
level_t rep_level = _rep_level_decoder.get_next();
level_t rep_level = _rep_level_get_next();
if (rep_level == 0) { // rep_level 0 indicates start of new row
*cross_page = false;
_rep_level_decoder.rewind_one();
_rep_level_rewind_one();
break;
}
_remaining_rep_nums--;
Expand Down
13 changes: 13 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,19 @@ class ColumnChunkReader {
void _get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2, Slice& page_data);
Status _skip_nested_rows_in_page(size_t num_rows);

level_t _rep_level_get_next() {
if constexpr (IN_COLLECTION) {
return _rep_level_decoder.get_next();
}
return 0;
}

void _rep_level_rewind_one() {
if constexpr (IN_COLLECTION) {
_rep_level_decoder.rewind_one();
}
}

ColumnChunkReaderState _state = NOT_INIT;
FieldSchema* _field_schema = nullptr;
const level_t _max_rep_level;
Expand Down
33 changes: 6 additions & 27 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx,
element_reader, max_buf_size, col_offsets, true, column_ids,
filter_column_ids));
// element_reader->set_nested_column();
auto array_reader = ArrayColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx);
element_reader->set_column_in_nested();
RETURN_IF_ERROR(array_reader->init(std::move(element_reader), field));
array_reader->_filter_column_ids = filter_column_ids;
reader.reset(array_reader.release());
Expand All @@ -133,7 +133,6 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx,
key_reader, max_buf_size, col_offsets, true, column_ids,
filter_column_ids));
// key_reader->set_nested_column();
} else {
auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges, total_rows, ctz,
io_ctx, &field->children[0]);
Expand All @@ -146,14 +145,15 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
RETURN_IF_ERROR(create(file, &field->children[1], row_group, row_ranges, ctz, io_ctx,
value_reader, max_buf_size, col_offsets, true, column_ids,
filter_column_ids));
// value_reader->set_nested_column();
} else {
auto skip_reader = std::make_unique<SkipReadingReader>(row_ranges, total_rows, ctz,
io_ctx, &field->children[0]);
value_reader = std::move(skip_reader);
}

auto map_reader = MapColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx);
key_reader->set_column_in_nested();
value_reader->set_column_in_nested();
RETURN_IF_ERROR(map_reader->init(std::move(key_reader), std::move(value_reader), field));
map_reader->_filter_column_ids = filter_column_ids;
reader.reset(map_reader.release());
Expand All @@ -168,7 +168,6 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
RETURN_IF_ERROR(create(file, &child, row_group, row_ranges, ctz, io_ctx,
child_reader, max_buf_size, col_offsets, in_collection,
column_ids, filter_column_ids));
// child_reader->set_nested_column();
child_readers[child.name] = std::move(child_reader);
// Record the first non-SkippingReader
if (non_skip_reader_idx == -1) {
Expand All @@ -180,14 +179,15 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
skip_reader->_filter_column_ids = filter_column_ids;
child_readers[child.name] = std::move(skip_reader);
}
child_readers[child.name]->set_column_in_nested();
}
// If all children are SkipReadingReader, force the first child to call create
if (non_skip_reader_idx == -1) {
std::unique_ptr<ParquetColumnReader> child_reader;
RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx,
child_reader, max_buf_size, col_offsets, in_collection,
column_ids, filter_column_ids));
// child_reader->set_nested_column();
child_reader->set_column_in_nested();
child_readers[field->children[0].name] = std::move(child_reader);
}
auto struct_reader = StructColumnReader::create_unique(row_ranges, total_rows, ctz, io_ctx);
Expand All @@ -201,8 +201,6 @@ Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field,
: nullptr;

const tparquet::ColumnChunk& chunk = row_group.columns[physical_index];

// ScalarColumnReader::create_unique(row_ranges, total_rows, chunk, offset_index, ctz, io_ctx);
if (in_collection) {
if (offset_index == nullptr) {
auto scalar_reader = ScalarColumnReader<true, false>::create_unique(
Expand Down Expand Up @@ -354,9 +352,6 @@ Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::_read_values(size_t num_
return Status::InternalError("Failed to decode definition level.");
}

for (int i = 0; i < loop_read; i++) {
_def_levels.emplace_back(def_level);
}
bool is_null = def_level < _field_schema->definition_level;
if (!(prev_is_null ^ is_null)) {
null_map.emplace_back(0);
Expand All @@ -371,14 +366,11 @@ Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::_read_values(size_t num_
prev_is_null = is_null;
has_read += loop_read;
}
} else {
_def_levels.resize(_def_levels.size() + num_values, 0);
}
} else {
if (_chunk_reader->max_def_level() > 0) {
return Status::Corruption("Not nullable column has null values in parquet file");
}
_def_levels.resize(_def_levels.size() + num_values, 0);
data_column = doris_column->assume_mutable();
}
if (null_map.size() == 0) {
Expand Down Expand Up @@ -560,7 +552,7 @@ Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::read_column_data(
_rep_levels.clear();
*read_rows = 0;

if constexpr (IN_COLLECTION) {
if (_in_nested) {
RETURN_IF_ERROR(_read_nested_column(resolved_column, resolved_type, filter_map, batch_size,
read_rows, eof, is_dict_filter));
return _converter->convert(resolved_column, _field_schema->data_type, type, doris_column,
Expand All @@ -574,7 +566,6 @@ Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::read_column_data(
} else {
right_row = _chunk_reader->page_end_row();
}
auto before_filter_map_index = _filter_map_index;

do {
// generate the row ranges that should be read
Expand Down Expand Up @@ -641,18 +632,6 @@ Status ScalarColumnReader<IN_COLLECTION, OFFSET_INDEX>::read_column_data(
}
}

if (filter_map.has_filter()) {
size_t new_rep_sz = 0;
for (size_t idx = before_filter_map_index; idx < _filter_map_index; idx++) {
if (filter_map.filter_map_data()[idx]) {
_def_levels[new_rep_sz] = _def_levels[idx - before_filter_map_index];
new_rep_sz++;
}
}
_def_levels.resize(new_rep_sz);
}
_rep_levels.resize(_def_levels.size(), 0);

return _converter->convert(resolved_column, _field_schema->data_type, type, doris_column,
is_dict_filter);
}
Expand Down
5 changes: 5 additions & 0 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class ParquetColumnReader {
virtual void reset_filter_map_index() = 0;

FieldSchema* get_field_schema() const { return _field_schema; }
void set_column_in_nested() { _in_nested = true; }

protected:
void _generate_read_ranges(RowRange page_row_range, RowRanges* result_ranges) const;
Expand All @@ -155,6 +156,10 @@ class ParquetColumnReader {

size_t _filter_map_index = 0;
std::set<uint64_t> _filter_column_ids;

// _in_nested: column in struct/map/array
// IN_COLLECTION : column in map/array
bool _in_nested = false;
};

template <bool IN_COLLECTION, bool OFFSET_INDEX>
Expand Down
Loading