Skip to content

Commit

Permalink
YQ-3011 fix parquet type validation (ydb-platform#3595)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored and kardymonds committed Jun 10, 2024
1 parent 04e41f0 commit e4fc932
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,18 @@ bool ExtractSettingValue(const TExprNode& value, TStringBuf settingName, TString

}

bool EnsureParquetTypeSupported(TPositionHandle position, const TTypeAnnotationNode* type, TExprContext& ctx, const IArrowResolver::TPtr& arrowResolver) {
auto resolveStatus = arrowResolver->AreTypesSupported(ctx.GetPosition(position), { type }, ctx);
YQL_ENSURE(resolveStatus != IArrowResolver::ERROR);

if (resolveStatus != IArrowResolver::OK) {
ctx.AddError(TIssue(ctx.GetPosition(position), TStringBuilder() << "Type " << *type << " is not supported for parquet"));
return false;
}

return true;
}

class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
public:
TS3DataSourceTypeAnnotationTransformer(TS3State::TPtr state)
Expand Down Expand Up @@ -407,7 +419,8 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
return TStatus::Error;
}

if (!TS3Object::Match(input->Child(TS3ReadObject::idx_Object))) {
const auto& objectNode = input->Child(TS3ReadObject::idx_Object);
if (!TS3Object::Match(objectNode)) {
ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3ReadObject::idx_Object)->Pos()), "Expected S3 object."));
return TStatus::Error;
}
Expand Down Expand Up @@ -467,6 +480,19 @@ class TS3DataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
return TStatus::Error;
}

if (objectNode->Child(TS3Object::idx_Format)->Content() == "parquet") {
YQL_ENSURE(State_->Types->ArrowResolver);
bool allTypesSupported = true;
for (const auto& item : rowType->Cast<TStructExprType>()->GetItems()) {
if (!EnsureParquetTypeSupported(input->Pos(), item->GetItemType(), ctx, State_->Types->ArrowResolver)) {
allTypesSupported = false;
}
}
if (!allTypesSupported) {
return TStatus::Error;
}
}

input->SetTypeAnn(ctx.MakeType<TTupleExprType>(TTypeAnnotationNode::TListType{
input->Child(TS3ReadObject::idx_World)->GetTypeAnn(),
ctx.MakeType<TListExprType>(rowType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,15 +259,6 @@ class TS3DqIntegration: public TDqIntegrationBase {

auto format = s3ReadObject.Object().Format().Ref().Content();
if (const auto useCoro = State_->Configuration->SourceCoroActor.Get(); (!useCoro || *useCoro) && format != "raw" && format != "json_list") {
if (format == "parquet") {
YQL_ENSURE(State_->Types->ArrowResolver);
TVector<const TTypeAnnotationNode*> allTypes;
for (const auto& x : rowType->Cast<TStructExprType>()->GetItems()) {
allTypes.push_back(x->GetItemType());
}
auto resolveStatus = State_->Types->ArrowResolver->AreTypesSupported(ctx.GetPosition(read->Pos()), allTypes, ctx);
YQL_ENSURE(resolveStatus == IArrowResolver::OK);
}
return Build<TDqSourceWrap>(ctx, read->Pos())
.Input<TS3ParseSettings>()
.Paths(s3ReadObject.Object().Paths())
Expand Down

0 comments on commit e4fc932

Please sign in to comment.