Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,7 @@ TColumnConverter BuildColumnConverter(const std::string& columnName, const std::
return conv;
}

if (originalType->id() == arrow::Type::DATE32) {
if (originalType->id() == arrow::Type::DATE32) {
// TODO: support more than 1 optional level
bool isOptional = false;
auto unpackedYqlType = UnpackOptional(yqlType, isOptional);
Expand Down Expand Up @@ -1541,6 +1541,20 @@ class TS3ReadCoroImpl : public TActorCoroImpl {

void BuildColumnConverters(std::shared_ptr<arrow::Schema> outputSchema, std::shared_ptr<arrow::Schema> dataSchema,
std::vector<int>& columnIndices, std::vector<TColumnConverter>& columnConverters) {

for (int i = 0; i < dataSchema->num_fields(); ++i) {
switch (dataSchema->field(i)->type()->id()) {
case arrow::Type::LIST:
throw parquet::ParquetException(TStringBuilder() << "File contains LIST field "
<< dataSchema->field(i)->name() << " and can't be parsed");
case arrow::Type::STRUCT:
throw parquet::ParquetException(TStringBuilder() << "File contains STRUCT field "
<< dataSchema->field(i)->name() << " and can't be parsed");
default:
;
}
}

columnConverters.reserve(outputSchema->num_fields());
for (int i = 0; i < outputSchema->num_fields(); ++i) {
const auto& targetField = outputSchema->field(i);
Expand All @@ -1556,9 +1570,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
}
columnIndices.push_back(srcFieldIndex);
auto rowSpecColumnIt = ReadSpec->RowSpec.find(targetField->name());
if (rowSpecColumnIt == ReadSpec->RowSpec.end()) {
throw parquet::ParquetException(TStringBuilder() << "Column " << targetField->name() << " not found in row spec");
}
YQL_ENSURE(rowSpecColumnIt != ReadSpec->RowSpec.end(), "Column " << targetField->name() << " not found in row spec");
columnConverters.emplace_back(BuildColumnConverter(targetField->name(), originalType, targetType, rowSpecColumnIt->second));
}
}
Expand Down
Binary file added ydb/tests/fq/s3/test_format_data/btct.parquet
Binary file not shown.
37 changes: 37 additions & 0 deletions ydb/tests/fq/s3/test_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,43 @@ def test_format(self, kikimr, s3, client, filename, type_format, yq_version):
if type_format != "json_list":
assert stat["ResultSet"]["IngressRows"]["sum"] == 3

@yq_all
def test_btc(self, kikimr, s3, client, yq_version):
self.create_bucket_and_upload_file("btct.parquet", s3, kikimr)
client.create_storage_connection("btct", "fbucket")

sql = f'''
PRAGMA s3.UseBlocksSource="true";
SELECT
*
FROM btct.`btct.parquet`
WITH (format=`parquet`,
SCHEMA=(
hash STRING,
version INT64,
size INT64,
block_hash UTF8,
block_number INT64,
virtual_size INT64,
lock_time INT64,
input_count INT64,
output_count INT64,
is_coinbase BOOL,
output_value DOUBLE,
block_timestamp DATETIME,
date DATE
)
);
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
describe_result = client.describe_query(query_id).result
issues = describe_result.query.issue[0].issues
assert "Error while reading file btct.parquet" in issues[0].message
assert "File contains LIST field outputs and can\'t be parsed" in issues[0].issues[0].message


@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_invalid_format(self, kikimr, s3, client):
Expand Down