Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 31 additions & 38 deletions ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1786,59 +1786,52 @@ class TYtNativeGateway : public IYtGateway {
auto typeAttrFilter = TAttributeFilter().AddAttribute("type").AddAttribute("_yql_type").AddAttribute("broken");
auto nodeList = entry->Tx->List(prefix,
TListOptions().AttributeFilter(typeAttrFilter));

TVector<std::variant<TString, std::exception_ptr>> types(Reserve(nodeList.size()));
TVector<
std::pair<
TString, //name
std::variant<TString, std::exception_ptr> //type or exception
>
> items(nodeList.size());
{
auto batchGet = entry->Tx->CreateBatchRequest();
TVector<TFuture<void>> batchRes;
for (size_t i: xrange(nodeList.size())) {
auto& node = nodeList[i];
auto type = GetAttrType(node);
if (type == "link") {
types.emplace_back(type);
const auto& node = nodeList[i];
items[i].first = node.AsString();
items[i].second = GetTypeFromNode(node, true);
if (std::get<TString>(items[i].second) == "link") {
if (!node.GetAttributes().HasKey("broken") || !node.GetAttributes()["broken"].AsBool()) {
batchRes.push_back(batchGet->Get(prefix + "/" + node.AsString(), TGetOptions().AttributeFilter(typeAttrFilter))
.Apply([i, &types] (const TFuture<NYT::TNode>& f) {
try {
types[i] = GetAttrType(f.GetValue());
} catch (...) {
types[i] = std::current_exception();
}
}));
batchRes.push_back(batchGet->Get(prefix + "/" + node.AsString() + "/@", TGetOptions().AttributeFilter(typeAttrFilter))
.Apply([i, &items](const TFuture<NYT::TNode> &f) {
try {
items[i].second = GetTypeFromAttributes(f.GetValue(), true);
} catch (...) {
items[i].second = std::current_exception();
}
}));
}
} else {
types.push_back(type);
}
}
batchGet->ExecuteBatch();
WaitExceptionOrAll(batchRes).GetValue();
}

names.reserve(types.size());
errors.reserve(types.size());
for (size_t i: xrange(nodeList.size())) {
auto& node = nodeList[i];
if (auto type = std::get_if<TString>(&types[i])) {
if (TStringBuf("map_node") == *type && !suffix.empty()) {
names.push_back(node.AsString());
names.reserve(items.size());
errors.reserve(items.size());
for (const auto& item: items) {
if (const auto* type = std::get_if<TString>(&item.second)) {
if (
(suffix.empty() && ("table" == *type || "view" == *type)) ||
(!suffix.empty() && "map_node" == *type)
) {
names.push_back(item.first);
errors.emplace_back();
} else if (TStringBuf("table") == *type && suffix.empty()) {
names.push_back(node.AsString());
errors.emplace_back();
} else if (TStringBuf("document") == *type && suffix.empty()) {
if (node.HasAttributes()) {
auto& attrs = node.GetAttributes();
if (attrs.HasKey("_yql_type") && attrs["_yql_type"].AsString() == "view") {
names.push_back(node.AsString());
errors.emplace_back();
}
}
}
} else {
auto exptr = std::get<std::exception_ptr>(types[i]);
auto exptr = std::get<std::exception_ptr>(item.second);
if (filterLambda) {
// Delayed error processing
names.push_back(node.AsString());
names.push_back(item.first);
errors.push_back(std::move(exptr));
} else {
std::rethrow_exception(exptr);
Expand Down Expand Up @@ -2391,7 +2384,7 @@ class TYtNativeGateway : public IYtGateway {
for (auto& idx: idxs) {
batchRes.push_back(batchGet->Get(tables[idx.first].Table() + "&/@", getOpts).Apply([idx, &attributes](const TFuture<NYT::TNode>& f) {
NYT::TNode attrs = f.GetValue();
if (GetType(attrs) == "link") {
if (GetTypeFromAttributes(attrs, false) == "link") {
// override some attributes by the link ones
if (attrs.HasKey(QB2Premapper)) {
attributes[idx.first][QB2Premapper] = attrs[QB2Premapper];
Expand Down Expand Up @@ -2419,7 +2412,7 @@ class TYtNativeGateway : public IYtGateway {
TYtTableStatInfo::TPtr statInfo = MakeIntrusive<TYtTableStatInfo>();
result.Data[idx.first].Stat = statInfo;

auto type = GetType(attrs);
auto type = GetTypeFromAttributes(attrs, false);
ui16 viewSyntaxVersion = 1;
if (type == "document") {
if (attrs.HasKey(YqlTypeAttribute)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,26 @@ using namespace NKikimr::NMiniKQL;
using namespace NNodes;
using namespace NThreading;

TString GetType(const NYT::TNode& attr) {
if (!attr.HasKey("type")) {
TString GetTypeFromAttributes(const NYT::TNode& attributes, bool getDocumentType) {
if (!attributes.HasKey("type")) {
return "unknown";
}

return attr["type"].AsString();
const auto type = attributes["type"].AsString();
if (getDocumentType && "document" == type) {
if (!attributes.HasKey("_yql_type")) {
return "unknown";
}
return attributes["_yql_type"].AsString();
} else {
return type;
}
}

TString GetAttrType(const NYT::TNode& node) {
TString GetTypeFromNode(const NYT::TNode& node, bool getDocumentType) {
if (!node.HasAttributes()) {
return "unknown";
}

return GetType(node.GetAttributes());
return GetTypeFromAttributes(node.GetAttributes(), getDocumentType);
}

TMaybe<TVector<IYtGateway::TBatchFolderResult::TFolderItem>> MaybeGetFolderFromCache(TTransactionCache::TEntry::TPtr entry, TStringBuf cacheKey) {
Expand Down Expand Up @@ -83,7 +89,7 @@ IYtGateway::TBatchFolderResult::TFolderItem MakeFolderItem(const NYT::TNode& nod
}
item.Attributes[attr.first] = attr.second;
}
item.Type = GetAttrType(node);
item.Type = GetTypeFromNode(node, false);
item.Path = path.StartsWith(NYT::TConfig::Get()->Prefix)
? path.substr(NYT::TConfig::Get()->Prefix.size())
: path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

namespace NYql::NNative {

TString GetType(const NYT::TNode& attr);
TString GetTypeFromAttributes(const NYT::TNode& attr, bool getDocumentType);

TString GetAttrType(const NYT::TNode& node);
TString GetTypeFromNode(const NYT::TNode& node, bool getDocumentType);

TMaybe<TVector<IYtGateway::TBatchFolderResult::TFolderItem>> MaybeGetFolderFromCache(TTransactionCache::TEntry::TPtr entry, TStringBuf cacheKey);

Expand Down