Skip to content

Commit

Permalink
YQ RD pass UV from parser to filter (ydb-platform#10721)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Oct 29, 2024
1 parent e08e74c commit 50f04ff
Show file tree
Hide file tree
Showing 10 changed files with 499 additions and 247 deletions.
87 changes: 45 additions & 42 deletions ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <ydb/library/yql/providers/common/schema/parser/yql_type_parser.h>
#include <ydb/library/yql/public/udf/udf_version.h>
#include <ydb/library/yql/public/purecalc/purecalc.h>
#include <ydb/library/yql/public/purecalc/io_specs/mkql/spec.h>
Expand All @@ -10,7 +11,6 @@
#include <ydb/core/fq/libs/common/util.h>
#include <ydb/core/fq/libs/row_dispatcher/json_filter.h>


namespace {

using TCallback = NFq::TJsonFilter::TCallback;
Expand All @@ -23,6 +23,12 @@ NYT::TNode CreateTypeNode(const TString& fieldType) {
.Add(fieldType);
}

NYT::TNode CreateOptionalTypeNode(const TString& fieldType) {
return NYT::TNode::CreateList()
.Add("OptionalType")
.Add(CreateTypeNode(fieldType));
}

void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
node.Add(
NYT::TNode::CreateList()
Expand All @@ -31,18 +37,29 @@ void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldTy
);
}

void AddOptionalField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) {
node.Add(NYT::TNode::CreateList()
.Add(fieldName)
.Add(NYT::TNode::CreateList().Add("OptionalType").Add(CreateTypeNode(fieldType)))
void AddTypedField(NYT::TNode& node, const TString& fieldName, const TString& fieldTypeYson) {
NYT::TNode parsedType;
Y_ENSURE(NYql::NCommon::ParseYson(parsedType, fieldTypeYson, Cerr), "Invalid field type");

// TODO: remove this when the re-parsing is removed from pq read actor
if (parsedType == CreateTypeNode("Json")) {
parsedType = CreateTypeNode("String");
} else if (parsedType == CreateOptionalTypeNode("Json")) {
parsedType = CreateOptionalTypeNode("String");
}

node.Add(
NYT::TNode::CreateList()
.Add(fieldName)
.Add(parsedType)
);
}

NYT::TNode MakeInputSchema(const TVector<TString>& columns) {
NYT::TNode MakeInputSchema(const TVector<TString>& columns, const TVector<TString>& types) {
auto structMembers = NYT::TNode::CreateList();
AddField(structMembers, OffsetFieldName, "Uint64");
for (const auto& col : columns) {
AddOptionalField(structMembers, col, "String");
for (size_t i = 0; i < columns.size(); ++i) {
AddTypedField(structMembers, columns[i], types[i]);
}
return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers));
}
Expand All @@ -68,7 +85,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
TVector<NYT::TNode> Schemas;
};

class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>> {
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>> {
public:
TFilterInputConsumer(
const TFilterInputSpec& spec,
Expand Down Expand Up @@ -106,15 +123,15 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T
}
}

void OnObject(std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&> values) override {
void OnObject(std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&> values) override {
Y_ENSURE(FieldsPositions.size() == values.second.size());

NKikimr::NMiniKQL::TThrowingBindTerminator bind;
with_lock (Worker->GetScopedAlloc()) {
auto& holderFactory = Worker->GetGraph().GetHolderFactory();

// TODO: use blocks here
for (size_t rowId = 0; rowId < values.second.front().size(); ++rowId) {
for (size_t rowId = 0; rowId < values.second.front()->size(); ++rowId) {
NYql::NUdf::TUnboxedValue* items = nullptr;

NYql::NUdf::TUnboxedValue result = Cache.NewArray(
Expand All @@ -126,13 +143,16 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<const T

size_t fieldId = 0;
for (const auto& column : values.second) {
items[FieldsPositions[fieldId++]] = column[rowId].data() // Check that std::string_view was initialized in json_parser
? NKikimr::NMiniKQL::MakeString(column[rowId]).MakeOptional()
: NKikimr::NUdf::TUnboxedValuePod();
items[FieldsPositions[fieldId++]] = column->at(rowId);
}

Worker->Push(std::move(result));
}

// Clear cache after each object because
// values allocated on another allocator and should be released
Cache.Clear();
Worker->GetGraph().Invalidate();
}
}

Expand Down Expand Up @@ -216,7 +236,7 @@ struct NYql::NPureCalc::TInputSpecTraits<TFilterInputSpec> {
static constexpr bool IsPartial = false;
static constexpr bool SupportPushStreamMode = true;

using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>>>;
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>>;

static TConsumerType MakeConsumer(
const TFilterInputSpec& spec,
Expand Down Expand Up @@ -244,12 +264,15 @@ class TJsonFilter::TImpl {
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback)
: Sql(GenerateSql(columns, types, whereFilter)) {
: Sql(GenerateSql(whereFilter)) {
Y_ENSURE(columns.size() == types.size(), "Number of columns and types should by equal");
auto factory = NYql::NPureCalc::MakeProgramFactory(NYql::NPureCalc::TProgramFactoryOptions());

// Program should be stateless because input values
// allocated on another allocator and should be released
LOG_ROW_DISPATCHER_DEBUG("Creating program...");
Program = factory->MakePushStreamProgram(
TFilterInputSpec(MakeInputSchema(columns)),
TFilterInputSpec(MakeInputSchema(columns, types)),
TFilterOutputSpec(MakeOutputSchema()),
Sql,
NYql::NPureCalc::ETranslationMode::SQL
Expand All @@ -258,7 +281,7 @@ class TJsonFilter::TImpl {
LOG_ROW_DISPATCHER_DEBUG("Program created");
}

void Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values) {
void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
Y_ENSURE(values, "Expected non empty schema");
InputConsumer->OnObject(std::make_pair(offsets, values));
}
Expand All @@ -268,29 +291,9 @@ class TJsonFilter::TImpl {
}

private:
TString GenerateSql(const TVector<TString>& columnNames, const TVector<TString>& columnTypes, const TString& whereFilter) {
TString GenerateSql(const TString& whereFilter) {
TStringStream str;
str << "$fields = SELECT ";
Y_ABORT_UNLESS(columnNames.size() == columnTypes.size());
str << OffsetFieldName << ", ";
for (size_t i = 0; i < columnNames.size(); ++i) {
TString columnType = columnTypes[i];
TString columnName = NFq::EncloseAndEscapeString(columnNames[i], '`');
if (columnType == "Json") {
columnType = "String";
} else if (columnType == "Optional<Json>") {
columnType = "Optional<String>";
}

if (columnType.StartsWith("Optional")) {
str << "IF(" << columnName << " IS NOT NULL, Unwrap(CAST(" << columnName << " as " << columnType << ")), NULL)";
} else {
str << "Unwrap(CAST(" << columnName << " as " << columnType << "))";
}
str << " as " << columnName << ((i != columnNames.size() - 1) ? "," : "");
}
str << " FROM Input;\n";
str << "$filtered = SELECT * FROM $fields " << whereFilter << ";\n";
str << "$filtered = SELECT * FROM Input " << whereFilter << ";\n";

str << "SELECT " << OffsetFieldName << ", Unwrap(Json::SerializeJson(Yson::From(RemoveMembers(TableRow(), [\"" << OffsetFieldName;
str << "\"])))) as data FROM $filtered";
Expand All @@ -300,7 +303,7 @@ class TJsonFilter::TImpl {

private:
THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<TVector<std::string_view>>&>>> InputConsumer;
THolder<NYql::NPureCalc::IConsumer<std::pair<const TVector<ui64>&, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>&>>> InputConsumer;
const TString Sql;
};

Expand All @@ -315,7 +318,7 @@ TJsonFilter::TJsonFilter(
TJsonFilter::~TJsonFilter() {
}

void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values) {
void TJsonFilter::Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values) {
Impl->Push(offsets, values);
}

Expand Down
5 changes: 2 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/json_filter.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/yql/public/udf/udf_value.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>

namespace NFq {

Expand All @@ -18,7 +17,7 @@ class TJsonFilter {

~TJsonFilter();

void Push(const TVector<ui64>& offsets, const TVector<TVector<std::string_view>>& values);
void Push(const TVector<ui64>& offsets, const TVector<const NKikimr::NMiniKQL::TUnboxedValueVector*>& values);
TString GetSql();

private:
Expand Down
Loading

0 comments on commit 50f04ff

Please sign in to comment.