Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 39 additions & 32 deletions ydb/core/kqp/gateway/behaviour/view/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,31 @@ TString GetByKeyOrDefault(const NYql::TCreateObjectSettings& container, const TS
return value ? *value : TString{};
}

void CheckFeatureFlag(TInternalModificationContext& context) {
TYqlConclusionStatus CheckFeatureFlag(TInternalModificationContext& context) {
auto* const actorSystem = context.GetExternalData().GetActorSystem();
if (!actorSystem) {
ythrow TSystemError() << "This place needs an actor system. Please contact internal support";
ythrow yexception() << "This place needs an actor system. Please contact internal support";
}
if (!AppData(actorSystem)->FeatureFlags.GetEnableViews()) {
ythrow TSystemError() << "Views are disabled. Please contact your system administrator to enable it";
return AppData(actorSystem)->FeatureFlags.GetEnableViews()
? TYqlConclusionStatus::Success()
: TYqlConclusionStatus::Fail("Views are disabled. Please contact your system administrator to enable the feature");
}

std::pair<TString, TString> SplitPathByDb(const TString& objectId,
const TString& database) {
std::pair<TString, TString> pathPair;
TString error;
if (!TrySplitPathByDb(objectId, database, pathPair, error)) {
ythrow TBadArgumentException() << error;
}
return pathPair;
}

void FillCreateViewProposal(NKikimrSchemeOp::TModifyScheme& modifyScheme,
const NYql::TCreateObjectSettings& settings,
TInternalModificationContext& context) {
const NYql::TCreateObjectSettings& settings,
const TString& database) {

std::pair<TString, TString> pathPair;
{
TString error;
if (!TrySplitPathByDb(settings.GetObjectId(), context.GetExternalData().GetDatabase(), pathPair, error)) {
ythrow TBadArgumentException() << error;
}
}
const auto pathPair = SplitPathByDb(settings.GetObjectId(), database);
modifyScheme.SetWorkingDir(pathPair.first);
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateView);

Expand All @@ -51,15 +55,9 @@ void FillCreateViewProposal(NKikimrSchemeOp::TModifyScheme& modifyScheme,

void FillDropViewProposal(NKikimrSchemeOp::TModifyScheme& modifyScheme,
const NYql::TDropObjectSettings& settings,
TInternalModificationContext& context) {
const TString& database) {

std::pair<TString, TString> pathPair;
{
TString error;
if (!TrySplitPathByDb(settings.GetObjectId(), context.GetExternalData().GetDatabase(), pathPair, error)) {
ythrow TBadArgumentException() << error;
}
}
const auto pathPair = SplitPathByDb(settings.GetObjectId(), database);
modifyScheme.SetWorkingDir(pathPair.first);
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpDropView);

Expand Down Expand Up @@ -92,7 +90,7 @@ NThreading::TFuture<TYqlConclusionStatus> CreateView(const NYql::TCreateObjectSe
proposal->Record.SetUserToken(context.GetExternalData().GetUserToken()->GetSerializedToken());
}
auto& schemeTx = *proposal->Record.MutableTransaction()->MutableModifyScheme();
FillCreateViewProposal(schemeTx, settings, context);
FillCreateViewProposal(schemeTx, settings, context.GetExternalData().GetDatabase());

return SendSchemeRequest(proposal.Release(), context.GetExternalData().GetActorSystem(), true);
}
Expand All @@ -105,21 +103,21 @@ NThreading::TFuture<TYqlConclusionStatus> DropView(const NYql::TDropObjectSettin
proposal->Record.SetUserToken(context.GetExternalData().GetUserToken()->GetSerializedToken());
}
auto& schemeTx = *proposal->Record.MutableTransaction()->MutableModifyScheme();
FillDropViewProposal(schemeTx, settings, context);
FillDropViewProposal(schemeTx, settings, context.GetExternalData().GetDatabase());

return SendSchemeRequest(proposal.Release(), context.GetExternalData().GetActorSystem(), false);
}

void PrepareCreateView(NKqpProto::TKqpSchemeOperation& schemeOperation,
const NYql::TObjectSettingsImpl& settings,
TInternalModificationContext& context) {
FillCreateViewProposal(*schemeOperation.MutableCreateView(), settings, context);
FillCreateViewProposal(*schemeOperation.MutableCreateView(), settings, context.GetExternalData().GetDatabase());
}

void PrepareDropView(NKqpProto::TKqpSchemeOperation& schemeOperation,
const NYql::TObjectSettingsImpl& settings,
TInternalModificationContext& context) {
FillDropViewProposal(*schemeOperation.MutableDropView(), settings, context);
FillDropViewProposal(*schemeOperation.MutableDropView(), settings, context.GetExternalData().GetDatabase());
}

}
Expand All @@ -129,21 +127,28 @@ NThreading::TFuture<TYqlConclusionStatus> TViewManager::DoModify(const NYql::TOb
const NMetadata::IClassBehaviour::TPtr& manager,
TInternalModificationContext& context) const {
Y_UNUSED(nodeId, manager);
const auto makeFuture = [](const TYqlConclusionStatus& status) {
return NThreading::MakeFuture<TYqlConclusionStatus>(status);
};

try {
CheckFeatureFlag(context);
if (const auto status = CheckFeatureFlag(context); status.IsFail()) {
return makeFuture(status);
}
switch (context.GetActivityType()) {
case EActivityType::Alter:
return makeFuture(TYqlConclusionStatus::Fail("Alter operation for VIEW objects is not implemented"));
case EActivityType::Upsert:
return makeFuture(TYqlConclusionStatus::Fail("Upsert operation for VIEW objects is not implemented"));
case EActivityType::Undefined:
ythrow TBadArgumentException() << "not implemented";
return makeFuture(TYqlConclusionStatus::Fail("Undefined operation for a VIEW object"));
case EActivityType::Create:
return CreateView(settings, context);
case EActivityType::Drop:
return DropView(settings, context);
}
} catch (...) {
return NThreading::MakeFuture<TYqlConclusionStatus>(TYqlConclusionStatus::Fail(CurrentExceptionMessage()));
return makeFuture(TYqlConclusionStatus::Fail(CurrentExceptionMessage()));
}
}

Expand All @@ -154,14 +159,16 @@ TViewManager::TYqlConclusionStatus TViewManager::DoPrepare(NKqpProto::TKqpScheme
Y_UNUSED(manager);

try {
CheckFeatureFlag(context);
if (const auto status = CheckFeatureFlag(context); status.IsFail()) {
return status;
}
switch (context.GetActivityType()) {
case EActivityType::Undefined:
ythrow TBadArgumentException() << "Undefined operation for a VIEW object";
return TYqlConclusionStatus::Fail("Undefined operation for a VIEW object");
case EActivityType::Upsert:
ythrow TBadArgumentException() << "Upsert operation for VIEW objects is not implemented";
return TYqlConclusionStatus::Fail("Upsert operation for VIEW objects is not implemented");
case EActivityType::Alter:
ythrow TBadArgumentException() << "Alter operation for VIEW objects is not implemented";
return TYqlConclusionStatus::Fail("Alter operation for VIEW objects is not implemented");
case EActivityType::Create:
PrepareCreateView(schemeOperation, settings, context);
break;
Expand Down
36 changes: 34 additions & 2 deletions ydb/core/kqp/gateway/kqp_metadata_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,29 @@ TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSc
return result;
}

TTableMetadataResult GetViewMetadataResult(
const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry,
const TString& cluster,
const TString& viewName
) {
const auto& description = schemeEntry.ViewInfo->Description;

TTableMetadataResult builtResult;
builtResult.SetSuccess();

builtResult.Metadata = new NYql::TKikimrTableMetadata(cluster, viewName);
auto metadata = builtResult.Metadata;
metadata->DoesExist = true;
metadata->PathId = NYql::TKikimrPathId(description.GetPathId().GetOwnerId(),
description.GetPathId().GetLocalId());
metadata->SchemaVersion = description.GetVersion();
metadata->Kind = NYql::EKikimrTableKind::View;
metadata->Attributes = schemeEntry.Attributes;
metadata->ViewPersistedData = {description.GetQueryText()};

return builtResult;
}

TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry,
const TString& cluster, const TString& tableName, std::optional<TString> queryName = std::nullopt) {
using TResult = NYql::IKikimrGateway::TTableMetadataResult;
Expand Down Expand Up @@ -306,7 +329,11 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache
return ResultFromError<TResult>(ToString(entry.Status));
}

YQL_ENSURE(entry.Kind == EKind::KindTable || entry.Kind == EKind::KindColumnTable || entry.Kind == EKind::KindExternalTable || entry.Kind == EKind::KindExternalDataSource);
YQL_ENSURE(IsIn({EKind::KindTable,
EKind::KindColumnTable,
EKind::KindExternalTable,
EKind::KindExternalDataSource,
EKind::KindView}, entry.Kind));

TTableMetadataResult result;
switch (entry.Kind) {
Expand All @@ -316,6 +343,9 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache
case EKind::KindExternalDataSource:
result = GetExternalDataSourceMetadataResult(entry, cluster, tableName);
break;
case EKind::KindView:
result = GetViewMetadataResult(entry, cluster, tableName);
break;
default:
result = GetTableMetadataResult(entry, cluster, tableName, queryName);
}
Expand Down Expand Up @@ -715,7 +745,9 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
return;
}

if (!IsIn({EKind::KindExternalDataSource, EKind::KindExternalTable}, entry.Kind) && expectedSchemaVersion && entry.TableId.SchemaVersion) {
if (!IsIn({EKind::KindExternalDataSource,
EKind::KindExternalTable,
EKind::KindView}, entry.Kind) && expectedSchemaVersion && entry.TableId.SchemaVersion) {
if (entry.TableId.SchemaVersion != expectedSchemaVersion) {
const auto message = TStringBuilder()
<< "schema version mismatch during metadata loading for: "
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/opt/logical/kqp_opt_log_ranges_predext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ TMaybeNode<TExprBase> TryBuildTrivialReadTable(TCoFlatMap& flatmap, TKqlReadTabl
case EKikimrTableKind::External:
case EKikimrTableKind::Unspecified:
return {};
case EKikimrTableKind::View:
YQL_ENSURE(false, "All views should have been rewritten at this stage.");
}

auto row = flatmap.Lambda().Args().Arg(0);
Expand Down
157 changes: 157 additions & 0 deletions ydb/core/kqp/provider/rewrite_io_utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#include "rewrite_io_utils.h"

#include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h>
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/sql/sql.h>
#include <ydb/library/yql/utils/log/log.h>

namespace NYql {
namespace {

using namespace NNodes;

constexpr const char* QueryGraphNodeSignature = "SavedQueryGraph";

NSQLTranslation::TTranslationSettings CreateViewTranslationSettings(const TString& cluster) {
NSQLTranslation::TTranslationSettings settings;

settings.DefaultCluster = cluster;
settings.ClusterMapping[cluster] = TString(NYql::KikimrProviderName);
settings.Mode = NSQLTranslation::ESqlMode::LIMITED_VIEW;

return settings;
}

TExprNode::TPtr CompileViewQuery(
const TString& query,
TExprContext& ctx,
const TString& cluster
) {
TAstParseResult queryAst;
queryAst = NSQLTranslation::SqlToYql(query, CreateViewTranslationSettings(cluster));

ctx.IssueManager.AddIssues(queryAst.Issues);
if (!queryAst.IsOk()) {
return nullptr;
}

TExprNode::TPtr queryGraph;
if (!CompileExpr(*queryAst.Root, queryGraph, ctx, nullptr, nullptr)) {
return nullptr;
}

return queryGraph;
}

void AddChild(const TExprNode::TPtr& parent, const TExprNode::TPtr& newChild) {
auto childrenToChange = parent->ChildrenList();
childrenToChange.emplace_back(newChild);
parent->ChangeChildrenInplace(std::move(childrenToChange));
}

TExprNode::TPtr FindSavedQueryGraph(const TExprNode::TPtr& carrier) {
if (carrier->ChildrenSize() == 0) {
return nullptr;
}
auto lastChild = carrier->Children().back();
return lastChild->IsCallable(QueryGraphNodeSignature) ? lastChild->ChildPtr(0) : TExprNode::TPtr();
}

void SaveQueryGraph(const TExprNode::TPtr& carrier, TExprContext& ctx, const TExprNode::TPtr& payload) {
AddChild(carrier, ctx.NewCallable(payload->Pos(), QueryGraphNodeSignature, {payload}));
}

void InsertExecutionOrderDependencies(
TExprNode::TPtr& queryGraph,
const TExprNode::TPtr& worldBefore,
TExprContext& ctx
) {
const auto initialWorldOfTheQuery = FindNode(queryGraph, [](const TExprNode::TPtr& node) {
return node->IsWorld();
});
if (!initialWorldOfTheQuery) {
return;
}
queryGraph = ctx.ReplaceNode(std::move(queryGraph), *initialWorldOfTheQuery, worldBefore);
}

bool CheckTopLevelness(const TExprNode::TPtr& candidateRead, const TExprNode::TPtr& queryGraph) {
THashSet<TExprNode::TPtr> readsInCandidateSubgraph;
VisitExpr(candidateRead, [&readsInCandidateSubgraph](const TExprNode::TPtr& node) {
if (node->IsCallable(ReadName)) {
readsInCandidateSubgraph.emplace(node);
}
return true;
});

return !FindNode(queryGraph, [&readsInCandidateSubgraph](const TExprNode::TPtr& node) {
return node->IsCallable(ReadName) && !readsInCandidateSubgraph.contains(node);
});
}

TExprNode::TPtr FindTopLevelRead(const TExprNode::TPtr& queryGraph) {
const TExprNode::TPtr* lastReadInTopologicalOrder = nullptr;
VisitExpr(
queryGraph,
nullptr,
[&lastReadInTopologicalOrder](const TExprNode::TPtr& node) {
if (node->IsCallable(ReadName)) {
lastReadInTopologicalOrder = &node;
}
return true;
}
);

if (!lastReadInTopologicalOrder) {
return nullptr;
}

YQL_ENSURE(CheckTopLevelness(*lastReadInTopologicalOrder, queryGraph),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is a good idea. This message is for developers, not for the user. However, I would like to check this assumption in the code somehow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, if I messed up in choosing the correct Read! node in this function, then the user will get an even more cryptic error message:

"Failed to execute callable with name: ResWrite!, you possibly used cross provider/cluster operations or pulled not materialized result in refselect mode"

This message appears whenever KQP failed to substitute some ResWrite! node to a TKiExecDataQuery! node, which usually happens when there are nodes with side-effects (like Read!) not seen on the path by the first child from the root of the query graph (see CheckTx function).

"Info for developers: assumption that there is only one top level Read! is wrong "
"for the expression graph of the query stored in the view:\n"
<< queryGraph->Dump());

return *lastReadInTopologicalOrder;
}

}

TExprNode::TPtr RewriteReadFromView(
const TExprNode::TPtr& node,
TExprContext& ctx,
const TString& query,
const TString& cluster
) {
const TCoRead readNode(node->ChildPtr(0));
const auto worldBeforeThisRead = readNode.World().Ptr();

TExprNode::TPtr queryGraph = FindSavedQueryGraph(readNode.Ptr());
if (!queryGraph) {
queryGraph = CompileViewQuery(query, ctx, cluster);
if (!queryGraph) {
ctx.AddError(TIssue(ctx.GetPosition(readNode.Pos()),
"The query stored in the view cannot be compiled."));
return nullptr;
}
YQL_CLOG(TRACE, ProviderKqp) << "Expression graph of the query stored in the view:\n"
<< NCommon::ExprToPrettyString(ctx, *queryGraph);

InsertExecutionOrderDependencies(queryGraph, worldBeforeThisRead, ctx);
SaveQueryGraph(readNode.Ptr(), ctx, queryGraph);
}

if (node->IsCallable(RightName)) {
return queryGraph;
}

const auto topLevelRead = FindTopLevelRead(queryGraph);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should at least combine InsertExecutionOrderDependencies and FindTopLevelRead methods. They serve the same purpose: get world topology roots and leafs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the them (InsertExecutionOrderDependencies) is needed for both rewrites (rewrite of the Left! call to the read node and rewrite of the Right! call to the read node), while the other one (FindTopLevelRead) is needed only for the rewrite of the Left! call the read node. In addition, one of them does change the query graph, while the other is only searching for a node in it. I would like to keep them separated.

if (!topLevelRead) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is that possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is actually pretty easy to have a query stored in a view that does not contain any reads. For example,

SELECT 1

return worldBeforeThisRead;
}
return Build<TCoLeft>(ctx, node->Pos()).Input(topLevelRead).Done().Ptr();
}

}
14 changes: 14 additions & 0 deletions ydb/core/kqp/provider/rewrite_io_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#pragma once

#include <ydb/library/yql/ast/yql_expr.h>

namespace NYql {

TExprNode::TPtr RewriteReadFromView(
const TExprNode::TPtr& node,
TExprContext& ctx,
const TString& query,
const TString& cluster
);

}
Loading