Skip to content

Commit

Permalink
revert parquet input format for case-insensitive feature
Browse files Browse the repository at this point in the history
  • Loading branch information
taiyang-li committed Apr 3, 2023
1 parent f57ffab commit 2115c51
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 29 deletions.
2 changes: 2 additions & 0 deletions utils/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,10 @@ void BackendInitializerUtil::initSettings()
settings.set("join_use_nulls", true);
settings.set("input_format_orc_allow_missing_columns", true);
settings.set("input_format_orc_case_insensitive_column_matching", true);
settings.set("input_format_orc_import_nested", true);
settings.set("input_format_parquet_allow_missing_columns", true);
settings.set("input_format_parquet_case_insensitive_column_matching", true);
settings.set("input_format_parquet_import_nested", true);
}

void BackendInitializerUtil::initContexts()
Expand Down
1 change: 0 additions & 1 deletion utils/local-engine/Storages/ArrowParquetBlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ class ArrowParquetBlockInputFormat : public DB::OptimizedParquetBlockInputFormat
{
public:
ArrowParquetBlockInputFormat(DB::ReadBuffer & in, const DB::Block & header, const DB::FormatSettings & formatSettings, const std::vector<int> & row_group_indices_ = {});
//virtual ~ArrowParquetBlockInputFormat();

private:
DB::Chunk generate() override;
Expand Down
66 changes: 40 additions & 26 deletions utils/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
#include <Formats/FormatSettings.h>
#include <IO/SeekableReadBuffer.h>
#include <Processors/Formats/Impl/ArrowBufferedStreams.h>
#include <Storages/ArrowParquetBlockInputFormat.h>
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
#include <Processors/Formats/Impl/ArrowColumnToCHColumn.h>
#include <Storages/SubstraitSource/ParquetFormatFile.h>

namespace DB
Expand All @@ -27,26 +28,34 @@ FormatFile::InputFormatPtr ParquetFormatFile::createInputFormat(const DB::Block
{
auto res = std::make_shared<FormatFile::InputFormat>();
res->read_buffer = std::move(read_buffer_builder->build(file_info));
std::vector<int> row_group_indices;
std::vector<RowGroupInfomation> required_row_groups;
int total_row_groups = 0;
if (auto * seekable_in = dynamic_cast<DB::SeekableReadBuffer *>(res->read_buffer.get()))
{
// reuse the read_buffer to avoid opening the file twice.
// especially,the cost of opening a hdfs file is large.
required_row_groups = collectRequiredRowGroups(seekable_in);
required_row_groups = collectRequiredRowGroups(seekable_in, total_row_groups);
seekable_in->seek(0, SEEK_SET);
}
else
{
required_row_groups = collectRequiredRowGroups();
}
required_row_groups = collectRequiredRowGroups(total_row_groups);

std::vector<int> required_row_group_indices;
required_row_group_indices.reserve(required_row_groups.size());
for (const auto & row_group : required_row_groups)
{
row_group_indices.emplace_back(row_group.index);
}
required_row_group_indices.emplace_back(row_group.index);

std::vector<int> total_row_group_indices(total_row_groups);
std::iota(total_row_group_indices.begin(), total_row_group_indices.end(), 0);

std::vector<int> skip_row_group_indices;
std::set_difference(total_row_group_indices.begin(), total_row_group_indices.end(),
required_row_group_indices.begin(), required_row_group_indices.end(),
std::back_inserter(skip_row_group_indices));

auto format_settings = DB::getFormatSettings(context);
format_settings.parquet.import_nested = true;
auto input_format = std::make_shared<local_engine::ArrowParquetBlockInputFormat>(*(res->read_buffer), header, format_settings, row_group_indices);
format_settings.parquet.skip_row_groups = std::unordered_set<int>(skip_row_group_indices.begin(), skip_row_group_indices.end());
auto input_format = std::make_shared<DB::ParquetBlockInputFormat>(*(res->read_buffer), header, format_settings);
res->input = input_format;
return res;
}
Expand All @@ -58,48 +67,53 @@ std::optional<size_t> ParquetFormatFile::getTotalRows()
if (total_rows)
return total_rows;
}
auto rowgroups = collectRequiredRowGroups();

int _;
auto rowgroups = collectRequiredRowGroups(_);
size_t rows = 0;
for (const auto & rowgroup : rowgroups)
{
rows += rowgroup.num_rows;
}

{
std::lock_guard lock(mutex);
total_rows = rows;
return total_rows;
}
}

std::vector<RowGroupInfomation> ParquetFormatFile::collectRequiredRowGroups()
std::vector<RowGroupInfomation> ParquetFormatFile::collectRequiredRowGroups(int & total_row_groups)
{
auto in = read_buffer_builder->build(file_info);
return collectRequiredRowGroups(in.get());
return collectRequiredRowGroups(in.get(), total_row_groups);
}

std::vector<RowGroupInfomation> ParquetFormatFile::collectRequiredRowGroups(DB::ReadBuffer * read_buffer)
std::vector<RowGroupInfomation> ParquetFormatFile::collectRequiredRowGroups(DB::ReadBuffer * read_buffer, int & total_row_groups)
{
std::unique_ptr<parquet::arrow::FileReader> reader;
DB::FormatSettings format_settings;
format_settings.seekable_read = true;
DB::FormatSettings format_settings
{
.seekable_read = true,
};
std::atomic<int> is_stopped{0};
std::unique_ptr<parquet::arrow::FileReader> reader;
auto status = parquet::arrow::OpenFile(
asArrowFile(*read_buffer, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES), arrow::default_memory_pool(), &reader);
if (!status.ok())
{
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Open file({}) failed. {}", file_info.uri_file(), status.ToString());
}

auto file_meta = reader->parquet_reader()->metadata();
total_row_groups = file_meta->num_row_groups();

std::vector<RowGroupInfomation> row_group_metadatas;
for (int i = 0, n = file_meta->num_row_groups(); i < n; ++i)
row_group_metadatas.reserve(total_row_groups);
for (int i = 0; i < total_row_groups; ++i)
{
auto row_group_meta = file_meta->RowGroup(i);

auto offset = static_cast<UInt64>(row_group_meta->file_offset());
if (!offset)
{
offset = static_cast<UInt64>(row_group_meta->ColumnChunk(0)->file_offset());
}

/// Current row group has intersection with the required range.
if (file_info.start() <= offset && offset < file_info.start() + file_info.length())
{
RowGroupInfomation info;
Expand All @@ -108,7 +122,7 @@ std::vector<RowGroupInfomation> ParquetFormatFile::collectRequiredRowGroups(DB::
info.start = row_group_meta->file_offset();
info.total_compressed_size = row_group_meta->total_compressed_size();
info.total_size = row_group_meta->total_byte_size();
row_group_metadatas.emplace_back(info);
row_group_metadatas.emplace_back(std::move(info));
}
}
return row_group_metadatas;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class ParquetFormatFile : public FormatFile
std::mutex mutex;
std::optional<size_t> total_rows;

std::vector<RowGroupInfomation> collectRequiredRowGroups();
std::vector<RowGroupInfomation> collectRequiredRowGroups(DB::ReadBuffer * read_buffer);
std::vector<RowGroupInfomation> collectRequiredRowGroups(int & total_row_groups);
std::vector<RowGroupInfomation> collectRequiredRowGroups(DB::ReadBuffer * read_buffer, int & total_row_groups);
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ NamesAndTypesList OptimizedParquetSchemaReader::readSchema()
return header.getNamesAndTypesList();
}

/*
void registerInputFormatParquet(FormatFactory & factory)
{
factory.registerInputFormat(
Expand Down Expand Up @@ -232,6 +233,7 @@ void registerInputFormatParquet(FormatFactory &)
}
void registerOptimizedParquetSchemaReader(FormatFactory &) {}
*/
}

#endif

0 comments on commit 2115c51

Please sign in to comment.