Skip to content

Commit

Permalink
Merge 77408b4 into ee48ce2
Browse files Browse the repository at this point in the history
  • Loading branch information
StekPerepolnen authored Apr 16, 2024
2 parents ee48ce2 + 77408b4 commit 90f8c91
Show file tree
Hide file tree
Showing 4 changed files with 363 additions and 91 deletions.
209 changes: 122 additions & 87 deletions ydb/core/viewer/json_autocomplete.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <ydb/core/base/tablet.h>
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/library/services/services.pb.h>
#include <ydb/core/viewer/protos/viewer.pb.h>
#include <ydb/core/viewer/json/json.h>
#include "viewer.h"
#include "query_autocomplete_helper.h"
Expand All @@ -13,6 +14,7 @@ namespace NViewer {

using namespace NActors;
using TNavigate = NSchemeCache::TSchemeCacheNavigate;
using TEntity = NKikimrViewer::TQueryAutocomplete_TResult_TEntity;

class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
using TBase = TViewerPipeClient<TJsonAutocomplete>;
Expand All @@ -22,25 +24,27 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
TJsonSettings JsonSettings;
ui32 Timeout = 0;

TAutoPtr<TEvViewer::TEvViewerResponse> ProxyResult;
TAutoPtr<NConsole::TEvConsole::TEvListTenantsResponse> ConsoleResult;
TAutoPtr<TEvTxProxySchemeCache::TEvNavigateKeySetResult> CacheResult;

struct SchemaWordData {
struct TSchemaWordData {
TString Name;
TString Type;
TEntity::EType Type;
TString Table;
SchemaWordData() {}
SchemaWordData(const TString& name, const TString& type, const TString& table = "")
TSchemaWordData() {}
TSchemaWordData(const TString& name, const TEntity::EType type, const TString& table = "")
: Name(name)
, Type(type)
, Table(table)
{}
};
THashMap<TString, SchemaWordData> Dictionary;
THashMap<TString, TSchemaWordData> Dictionary;
TString Database;
TVector<TString> Tables;
TVector<TString> Paths;
TString Prefix;
TString PrefixPath;
TString SearchWord;
ui32 Limit = 10;
NKikimrViewer::TQueryAutocomplete Result;

Expand All @@ -63,6 +67,7 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
TStringBuf content = Event->Get()->Request.GetPostContent();
ParsePostContent(content);
}
PrepareParameters();
}

// proxied request
Expand All @@ -72,43 +77,49 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
auto& request = ViewerRequest->Get()->Record.GetAutocompleteRequest();

Database = request.GetDatabase();
for (auto& path: request.GetTables()) {
Paths.emplace_back(path);
for (auto& table: request.GetTables()) {
Tables.emplace_back(table);
}
Prefix = request.GetPrefix();

Timeout = ViewerRequest->Get()->Record.GetTimeout();
Direct = true;
PrepareParameters();
}

void PreparePaths() {
if (Paths.size() == 0) {
Paths.emplace_back("");
}
TString prefixPath = "";
auto splitPos = Prefix.find_last_of('/');
if (splitPos != std::string::npos) {
prefixPath += "/" + Prefix.substr(0, splitPos);
Prefix = Prefix.substr(splitPos + 1);
}
for (TString& path: Paths) {
if (!path.StartsWith(Database)) {
path = Database + "/" + path;
}
path += prefixPath;
void PrepareParameters() {
if (Database) {
TString prefixUpToLastSlash = "";
auto splitPos = Prefix.find_last_of('/');
if (splitPos != std::string::npos) {
path += "/" + Prefix.substr(0, splitPos);
Prefix = Prefix.substr(splitPos + 1);
prefixUpToLastSlash += Prefix.substr(0, splitPos);
SearchWord = Prefix.substr(splitPos + 1);
} else {
SearchWord = Prefix;
}

if (Tables.size() == 0) {
Paths.emplace_back(Database);
} else {
for (TString& table: Tables) {
TString path = table;
if (!table.StartsWith(Database)) {
path = Database + "/" + path;
}
path += "/" + prefixUpToLastSlash;
Paths.emplace_back(path);
}
}
} else {
SearchWord = Prefix;
}
}

void ParseCgiParameters(const TCgiParameters& params) {
JsonSettings.EnumAsNumbers = !FromStringWithDefault<bool>(params.Get("enums"), true);
JsonSettings.UI64AsString = !FromStringWithDefault<bool>(params.Get("ui64"), false);
Database = params.Get("database");
StringSplitter(params.Get("table")).Split(',').SkipEmpty().Collect(&Paths);
StringSplitter(params.Get("table")).Split(',').SkipEmpty().Collect(&Tables);
Prefix = params.Get("prefix");
Limit = FromStringWithDefault<ui32>(params.Get("limit"), Limit);
Direct = FromStringWithDefault<bool>(params.Get("direct"), Direct);
Expand All @@ -121,8 +132,10 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
bool success = NJson::ReadJsonTree(content, &JsonConfig, &requestData);
if (success) {
Database = Database.empty() ? requestData["database"].GetStringSafe({}) : Database;
for (auto& table: requestData["tables"].GetArraySafe()) {
Paths.emplace_back(table.GetStringSafe());
if (requestData["table"].IsArray()) {
for (auto& table: requestData["table"].GetArraySafe()) {
Tables.emplace_back(table.GetStringSafe());
}
}
Prefix = Prefix.empty() ? requestData["prefix"].GetStringSafe({}) : Prefix;
}
Expand Down Expand Up @@ -157,38 +170,41 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {

void Bootstrap() {
if (ViewerRequest) {
// proxied request
PreparePaths();
// handle proxied request
SendSchemeCacheRequest();
} else if (!Database) {
// autocomplete databases via console request
// autocomplete database list via console request
RequestConsoleListTenants();
} else {
if (!Direct) {
// autocomplete with proxy
RequestStateStorageEndpointsLookup(Database); // to find some dynamic node and redirect there
// proxy request to a dynamic node of the specified database
RequestStateStorageEndpointsLookup(Database);
}
if (Requests == 0) {
// autocomplete without proxy
PreparePaths();
// perform autocomplete without proxying
SendSchemeCacheRequest();
}
}


Become(&TThis::StateRequestedDescribe, TDuration::MilliSeconds(Timeout), new TEvents::TEvWakeup());
}

void Connected(TEvInterconnect::TEvNodeConnected::TPtr &) {}

void Undelivered(TEvents::TEvUndelivered::TPtr &ev) {
if (ev->Get()->SourceType == NViewer::TEvViewer::EvViewerRequest) {
SendSchemeCacheRequest();
if (!Direct && ev->Get()->SourceType == NViewer::TEvViewer::EvViewerRequest) {
Direct = true;
SendSchemeCacheRequest(); // fallback
RequestDone();
}
}

void Disconnected(TEvInterconnect::TEvNodeDisconnected::TPtr &) {
SendSchemeCacheRequest();
if (!Direct) {
Direct = true;
SendSchemeCacheRequest(); // fallback
RequestDone();
}
}

void Handle(TEvStateStorage::TEvBoardInfo::TPtr& ev) {
Expand All @@ -203,6 +219,7 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
} else {
SendDynamicNodeAutocompleteRequest();
}
RequestDone();
}

void SendSchemeCacheRequest() {
Expand Down Expand Up @@ -253,90 +270,106 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
}
}

TString ConvertType(TNavigate::EKind navigate) {
TEntity::EType ConvertType(TNavigate::EKind navigate) {
switch (navigate) {
case TNavigate::KindSubdomain:
return "subdomain";
return TEntity::Subdomain;
case TNavigate::KindPath:
return "directory";
return TEntity::Directory;
case TNavigate::KindExtSubdomain:
return "database";
return TEntity::Database;
case TNavigate::KindTable:
return "table";
return TEntity::Table;
case TNavigate::KindOlapStore:
return "columnStore";
return TEntity::ColumnStore;
case TNavigate::KindColumnTable:
return "columnTable";
return TEntity::ColumnTable;
case TNavigate::KindRtmr:
return "rtmrVolume";
return TEntity::RtmrVolume;
case TNavigate::KindKesus:
return "kesus";
return TEntity::Kesus;
case TNavigate::KindSolomon:
return "solomonVolume";
return TEntity::SolomonVolume;
case TNavigate::KindTopic:
return "persQueueGroup";
return TEntity::PersQueueGroup;
case TNavigate::KindCdcStream:
return "cdcStream";
return TEntity::CdcStream;
case TNavigate::KindSequence:
return "sequence";
return TEntity::Sequence;
case TNavigate::KindReplication:
return "replication";
return TEntity::Replication;
case TNavigate::KindBlobDepot:
return "blobDepot";
return TEntity::BlobDepot;
case TNavigate::KindExternalTable:
return "externalTable";
return TEntity::ExternalTable;
case TNavigate::KindExternalDataSource:
return "externalDataSource";
return TEntity::ExternalDataSource;
case TNavigate::KindBlockStoreVolume:
return "blockStoreVolume";
return TEntity::BlockStoreVolume;
case TNavigate::KindFileStore:
return "fileStore";
return TEntity::FileStore;
case TNavigate::KindView:
return "view";
return TEntity::View;
default:
return "directory";
return TEntity::Directory;
}
}

void ParseProxyResult() {
if (ProxyResult == nullptr) {
Result.add_error("Failed to collect information from ProxyResult");
return;
}
if (ProxyResult->Record.HasAutocompleteResponse()) {
Result = ProxyResult->Record.GetAutocompleteResponse();
} else {
Result.add_error("Proxying return empty response");
}

}

void ParseConsoleResult() {
if (ConsoleResult == nullptr) {
Result.add_error("Failed to collect information");
Result.add_error("Failed to collect information from ConsoleResult");
return;
}

Ydb::Cms::ListDatabasesResult listTenantsResult;
ConsoleResult->Record.GetResponse().operation().result().UnpackTo(&listTenantsResult);
for (const TString& path : listTenantsResult.paths()) {
Dictionary[path] = SchemaWordData(path, "database");
Dictionary[path] = TSchemaWordData(path, TEntity::Database);
}
RequestDone();
}

void ParseCacheResult() {
if (CacheResult == nullptr) {
Result.add_error("Failed to collect information");
Result.add_error("Failed to collect information from CacheResult");
return;
}
NSchemeCache::TSchemeCacheNavigate *navigate = CacheResult->Request.Get();
if (navigate->ErrorCount > 0) {
Result.add_error("Inner errors while collected information");
for (auto& entry: CacheResult->Request.Get()->ResultSet) {
if (entry.Status != TSchemeCacheNavigate::EStatus::Ok) {
Result.add_error(TStringBuilder() << "Error receiving Navigate response: `" << CanonizePath(entry.Path) << "` has <" << ToString(entry.Status) << "> status");
}
}
return;
}
for (auto& entry: CacheResult->Request.Get()->ResultSet) {
TString path = CanonizePath(entry.Path);
if (entry.ListNodeEntry) {
for (const auto& child : entry.ListNodeEntry->Children) {
Dictionary[child.Name] = SchemaWordData(child.Name, ConvertType(child.Kind), path);
Dictionary[child.Name] = TSchemaWordData(child.Name, ConvertType(child.Kind), path);
}
};
for (const auto& [id, column] : entry.Columns) {
Dictionary[column.Name] = SchemaWordData(column.Name, path, "column");
Dictionary[column.Name] = TSchemaWordData(column.Name, TEntity::Column, path);
}
for (const auto& index : entry.Indexes) {
Dictionary[index.GetName()] = SchemaWordData(index.GetName(), path, "index");
Dictionary[index.GetName()] = TSchemaWordData(index.GetName(), TEntity::Index, path);
}
for (const auto& cdcStream : entry.CdcStreams) {
Dictionary[cdcStream.GetName()] = SchemaWordData(cdcStream.GetName(), path, "cdcstream");
Dictionary[cdcStream.GetName()] = TSchemaWordData(cdcStream.GetName(), TEntity::CdcStream, path);
}
}
}
Expand Down Expand Up @@ -364,23 +397,27 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
}

void ReplyAndPassAway() {
if (!Database) {
ParseConsoleResult();
} else {
if (ProxyResult) {
ParseProxyResult();
} else if (Database) {
ParseCacheResult();
} else {
ParseConsoleResult();
}

Result.set_success(Result.error_size() == 0);
if (Result.error_size() == 0) {
auto fuzzy = FuzzySearcher<SchemaWordData>(Dictionary);
auto autocomplete = fuzzy.Search(Prefix, Limit);
Result.MutableResult()->SetTotal(autocomplete.size());
for (SchemaWordData& wordData: autocomplete) {
auto entity = Result.MutableResult()->AddEntities();
entity->SetName(wordData.Name);
entity->SetType(wordData.Type);
if (wordData.Table) {
entity->SetParent(wordData.Table);
if (!ProxyResult) {
Result.set_success(Result.error_size() == 0);
if (Result.error_size() == 0) {
auto fuzzy = FuzzySearcher<TSchemaWordData>(Dictionary);
auto autocomplete = fuzzy.Search(SearchWord, Limit);
Result.MutableResult()->SetTotal(autocomplete.size());
for (TSchemaWordData& wordData: autocomplete) {
auto entity = Result.MutableResult()->AddEntities();
entity->SetName(wordData.Name);
entity->SetType(wordData.Type);
if (wordData.Table) {
entity->SetParent(wordData.Table);
}
}
}
}
Expand All @@ -390,10 +427,8 @@ class TJsonAutocomplete : public TViewerPipeClient<TJsonAutocomplete> {
}

void Handle(TEvViewer::TEvViewerResponse::TPtr& ev) {
if (ev.Get()->Get()->Record.HasAutocompleteResponse()) {
Result = ev.Get()->Get()->Record.GetAutocompleteResponse();
}
SendAutocompleteResponse();
ProxyResult = ev.Release()->Release();
RequestDone();
}

void HandleTimeout() {
Expand Down
Loading

0 comments on commit 90f8c91

Please sign in to comment.