Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
kssenii committed Sep 29, 2023
1 parent 5057a31 commit a7f389a
Showing 1 changed file with 70 additions and 32 deletions.
102 changes: 70 additions & 32 deletions src/Storages/DataLakes/IcebergMetadataParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,12 @@ struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
*/
void processManifestList(Metadata & metadata, const Configuration & configuration, ContextPtr context)
{
static constexpr auto manifest_path = "manifest_path";

auto buf = MetadataReadHelper::createReadBuffer(metadata.manifest_list, context, configuration);
auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buf));

auto data_type = AvroSchemaReader::avroNodeToDataType(file_reader->dataSchema().root()->leafAt(0));
auto columns = parseAvro(*file_reader, data_type, manifest_path, getFormatSettings(context));
Block header{{data_type->createColumn(), data_type, "manifest_path"}};
auto columns = parseAvro(*file_reader, header, getFormatSettings(context));
auto & col = columns.at(0);

if (col->getDataType() != TypeIndex::String)
Expand Down Expand Up @@ -207,9 +206,7 @@ struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
*/
Strings getFilesForRead(const Metadata & metadata, const Configuration & configuration, ContextPtr context)
{
static constexpr auto manifest_path = "data_file";

Strings keys;
NameSet keys;
for (const auto & manifest_file : metadata.manifest_files)
{
auto buffer = MetadataReadHelper::createReadBuffer(manifest_file, context, configuration);
Expand All @@ -226,59 +223,100 @@ struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
root_node->leaves(), expected_min_num);
}

avro::NodePtr status_node = root_node->leafAt(0);
if (status_node->type() != avro::Type::AVRO_INT)
{
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `status` field should be Int type, got {}",
magic_enum::enum_name(status_node->type()));
}

avro::NodePtr data_file_node = root_node->leafAt(static_cast<int>(leaves_num) - 1);
if (data_file_node->type() != avro::Type::AVRO_RECORD)
{
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `data_file` field should be Tuple type, got {}",
data_file_node->type());
magic_enum::enum_name(data_file_node->type()));
}
auto data_type = AvroSchemaReader::avroNodeToDataType(data_file_node);
const auto columns = parseAvro(*file_reader, data_type, manifest_path, getFormatSettings(context));
const auto col_tuple = typeid_cast<ColumnTuple *>(columns.at(0).get());

ColumnPtr col_str;
if (metadata.format_version == 1)
col_str = col_tuple->getColumnPtr(0);
else
col_str = col_tuple->getColumnPtr(1);
auto status_col_data_type = AvroSchemaReader::avroNodeToDataType(status_node);
auto data_col_data_type = AvroSchemaReader::avroNodeToDataType(data_file_node);
Block header{
{status_col_data_type->createColumn(), status_col_data_type, "status"},
{data_col_data_type->createColumn(), data_col_data_type, "data_file"}};

if (col_str->getDataType() != TypeIndex::String)
const auto columns = parseAvro(*file_reader, header, getFormatSettings(context));
if (columns.size() != 2)
{
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `file_path` field should be String type, got {}",
col_str->getFamilyName());
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
"Unexpected number of columns. Expected 2, got {}", columns.size());
}

if (columns.at(0)->getDataType() != TypeIndex::Int32)
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `status` field should be Int32 type, got {}",
columns.at(0)->getFamilyName());
}
if (columns.at(1)->getDataType() != TypeIndex::Tuple)
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `file_path` field should be Tuple type, got {}",
columns.at(1)->getFamilyName());
}

const auto status_int_column = assert_cast<ColumnInt32 *>(columns.at(0).get());
const auto data_file_tuple_column = assert_cast<ColumnTuple *>(columns.at(1).get());

if (status_int_column->size() != data_file_tuple_column->size())
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `file_path` and `status` have different rows number: {} and {}",
status_int_column->size(), data_file_tuple_column->size());
}

const auto * str_col = assert_cast<const ColumnString *>(col_str.get());
for (size_t i = 0; i < str_col->size(); ++i)
const auto * data_file_name_column = metadata.format_version == 1
? data_file_tuple_column->getColumnPtr(0).get()
: data_file_tuple_column->getColumnPtr(1).get();

if (data_file_name_column->getDataType() != TypeIndex::String)
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `file_path` field should be String type, got {}",
data_file_name_column->getFamilyName());
}
auto file_name_str_column = assert_cast<const ColumnString *>(data_file_name_column);

for (size_t i = 0; i < status_int_column->size(); ++i)
{
const auto data_path = std::string(str_col->getDataAt(i).toView());
const auto status = status_int_column->getInt(i);
const auto data_path = std::string(file_name_str_column->getDataAt(i).toView());
const auto pos = data_path.find(configuration.url.key);
const auto file_path = data_path.substr(pos);
if (pos == std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected to find {} in data path: {}", configuration.url.key, data_path);
keys.emplace_back(data_path.substr(pos));

if (status == 2)
keys.erase(file_path);
else
keys.insert(file_path);
}
}

return keys;
return std::vector<std::string>(keys.begin(), keys.end());
}

MutableColumns parseAvro(
avro::DataFileReaderBase & file_reader,
const DataTypePtr & data_type,
const String & field_name,
const Block & header,
const FormatSettings & settings)
{
auto deserializer = std::make_unique<AvroDeserializer>(
Block{{data_type->createColumn(), data_type, field_name}}, file_reader.dataSchema(), true, true, settings);
auto deserializer = std::make_unique<AvroDeserializer>(header, file_reader.dataSchema(), true, true, settings);
MutableColumns columns = header.cloneEmptyColumns();

file_reader.init();
MutableColumns columns;
columns.emplace_back(data_type->createColumn());

RowReadExtension ext;
while (file_reader.hasMore())
{
Expand Down

0 comments on commit a7f389a

Please sign in to comment.