Skip to content

Commit

Permalink
Fq gather stable 24-1 (ydb-platform#3517)
Browse files Browse the repository at this point in the history
Co-authored-by: Ilnaz Nizametdinov <ilnaz@ydb.tech>
Co-authored-by: kruall <kruall@ydb.tech>
Co-authored-by: Alexander Petrukhin <shmel1k@ydb.tech>
Co-authored-by: Vitalii Gridnev <gridnevvvit@gmail.com>
Co-authored-by: vporyadke <zalyalov@ydb.tech>
Co-authored-by: Nikolay Shestakov <tesseract@ydb.tech>
Co-authored-by: niksaveliev <nik@saveliev.me>
Co-authored-by: Vitaly Isaev <vitalyisaev@ydb.tech>
Co-authored-by: uzhastik <uzhas@ydb.tech>
Co-authored-by: Pisarenko Grigoriy <79596613+GrigoriyPA@users.noreply.github.com>
Co-authored-by: Yaroslav Plishan <80714170+MetaGigachad@users.noreply.github.com>
Co-authored-by: Hor911 <hor911@ydb.tech>
Co-authored-by: Dmitry Kardymon <kardymon-d@ydb.tech>
Co-authored-by: Oleg Doronin <dorooleg@yandex.ru>
Co-authored-by: Ivan Blinkov <ivan@blinkov.ru>
Co-authored-by: uzhastik <uzhastik@gmail.com>
Co-authored-by: Daniil Cherednik <dcherednik@ydb.tech>
Co-authored-by: Andrey Kulaga <aakulaga@ydb.tech>
  • Loading branch information
19 people committed Jun 10, 2024
1 parent 26640da commit 04e41f0
Show file tree
Hide file tree
Showing 321 changed files with 7,132 additions and 3,863 deletions.
8 changes: 5 additions & 3 deletions .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient
ydb/core/kafka_proxy/ut KafkaProtocol.CreatePartitionsScenario
ydb/core/kafka_proxy/ut KafkaProtocol.ProduceScenario
ydb/core/kqp/provider/ut KikimrIcGateway.TestLoadBasicSecretValueFromExternalDataSourceMetadata
ydb/core/kqp/ut/federated_query/generic *
ydb/core/kqp/ut/olap *
ydb/core/kqp/ut/olap KqpOlap.IndexesActualization
ydb/core/kqp/ut/olap KqpOlap.BlobsSharing*
ydb/core/kqp/ut/olap KqpOlap.ScanQueryOltpAndOlap
ydb/core/kqp/ut/olap KqpOlap.StatsUsageWithTTL
ydb/core/kqp/ut/olap KqpOlap.YqlScriptOltpAndOlap
ydb/core/kqp/ut/pg KqpPg.CreateIndex
ydb/core/kqp/ut/query KqpLimits.QueryReplySize
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
Expand All @@ -29,7 +32,6 @@ ydb/core/kqp/ut/scheme KqpScheme.QueryWithAlter
ydb/core/kqp/ut/scheme [44/50]*
ydb/core/kqp/ut/service KqpQueryService.ExecuteQueryPgTableSelect
ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession
ydb/core/kqp/ut/service KqpQueryServiceScripts.ForgetScriptExecutionRace
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
ydb/core/kqp/ut/service [38/50]*
ydb/core/tx/columnshard/ut_schema TColumnShardTestSchema.ForgetAfterFail
Expand Down
1 change: 1 addition & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ struct TKikimrEvents : TEvents {
ES_GRAPH,
ES_REPLICATION_SERVICE,
ES_CHANGE_EXCHANGE,
ES_S3_FILE_QUEUE,
};
};

Expand Down
11 changes: 6 additions & 5 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,20 @@ struct TObjectStorageExternalSource : public IExternalSource {
const NKikimrExternalSources::TGeneral& general) const override {
NKikimrExternalSources::TObjectStorage objectStorage;
for (const auto& [key, value]: general.attributes()) {
if (key == "format") {
auto lowerKey = to_lower(key);
if (lowerKey == "format") {
objectStorage.set_format(value);
} else if (key == "compression") {
} else if (lowerKey == "compression") {
objectStorage.set_compression(value);
} else if (key.StartsWith("projection.") || key == "storage.location.template") {
objectStorage.mutable_projection()->insert({key, value});
} else if (key == "partitioned_by") {
} else if (lowerKey == "partitioned_by") {
auto json = NSc::TValue::FromJsonThrow(value);
for (const auto& column: json.GetArray()) {
*objectStorage.add_partitioned_by() = column;
}
} else if (IsIn({"file_pattern"sv, "data.interval.unit"sv, "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv, "csv_delimiter"sv}, key)) {
objectStorage.mutable_format_setting()->insert({key, value});
} else if (IsIn({"file_pattern"sv, "data.interval.unit"sv, "data.datetime.format_name"sv, "data.datetime.format"sv, "data.timestamp.format_name"sv, "data.timestamp.format"sv, "csv_delimiter"sv}, lowerKey)) {
objectStorage.mutable_format_setting()->insert({lowerKey, value});
} else {
ythrow TExternalSourceException() << "Unknown attribute " << key;
}
Expand Down
15 changes: 6 additions & 9 deletions ydb/core/fq/libs/actors/clusters_from_connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,14 @@ void AddClustersFromConnections(
switch (conn.content().setting().connection_case()) {
case FederatedQuery::ConnectionSetting::kYdbDatabase: {
const auto& db = conn.content().setting().ydb_database();
auto* clusterCfg = gatewaysConfig.MutableYdb()->AddClusterMapping();
auto* clusterCfg = gatewaysConfig.MutableGeneric()->AddClusterMapping();
clusterCfg->SetKind(NYql::NConnector::NApi::EDataSourceKind::YDB);
clusterCfg->SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE);
clusterCfg->SetName(connectionName);
clusterCfg->SetId(db.database_id());
if (db.database())
clusterCfg->SetDatabase(db.database());
if (db.endpoint())
clusterCfg->SetEndpoint(db.endpoint());
clusterCfg->SetSecure(db.secure());
clusterCfg->SetAddBearerToToken(common.GetUseBearerForYdb());
clusterCfg->SetDatabaseId(db.database_id());
clusterCfg->SetUseSsl(!common.GetDisableSslForGenericDataSources());
FillClusterAuth(*clusterCfg, db.auth(), authToken, accountIdSignatures);
clusters.emplace(connectionName, YdbProviderName);
clusters.emplace(connectionName, GenericProviderName);
break;
}
case FederatedQuery::ConnectionSetting::kClickhouseCluster: {
Expand Down
104 changes: 55 additions & 49 deletions ydb/core/fq/libs/actors/database_resolver.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "database_resolver.h"

#include <util/string/split.h>
#include <ydb/core/fq/libs/common/cache.h>
#include <ydb/core/fq/libs/config/protos/issue_id.pb.h>
#include <ydb/core/fq/libs/events/events.h>
Expand Down Expand Up @@ -98,8 +99,6 @@ class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>
}

void DieOnTtl() {
Success = false;

auto errorMsg = TStringBuilder() << "Could not resolve database ids: ";
bool firstUnresolvedDbId = true;
for (const auto& [_, params]: Requests) {
Expand All @@ -112,46 +111,41 @@ class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>
}
errorMsg << " in " << ResolvingTtl << " seconds.";
LOG_E("ResponseProcessor::DieOnTtl: errorMsg=" << errorMsg);

SendResolvedEndpointsAndDie(errorMsg);
Issues.AddIssue(errorMsg);
SendResolvedEndpointsAndDie();
}

void SendResolvedEndpointsAndDie(const TString& errorMsg) {
NYql::TIssues issues;
if (errorMsg) {
issues.AddIssue(errorMsg);
}

void SendResolvedEndpointsAndDie() {
Send(Sender,
new TEvents::TEvEndpointResponse(
NYql::TDatabaseResolverResponse(std::move(DatabaseId2Description), Success, issues)));
NYql::TDatabaseResolverResponse(std::move(DatabaseId2Description), Issues.Empty(), Issues)));
PassAway();
LOG_D("ResponseProcessor::SendResolvedEndpointsAndDie: passed away");
}

void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr& ev)
{
TString errorMessage;
TMaybe<TDatabaseDescription> result;
const auto requestIter = Requests.find(ev->Get()->Request);
HandledIds++;

LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): got MDB API response: code=" << ev->Get()->Response->Status);
LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): got API response: code=" << ev->Get()->Response->Status);

try {
HandleResponse(ev, requestIter, errorMessage, result);
HandleResponse(ev, requestIter, result);
} catch (...) {
const TString msg = TStringBuilder() << "error while response processing, params "
<< ((requestIter != Requests.end()) ? requestIter->second.ToDebugString() : TString{"unknown"})
<< ", details: " << CurrentExceptionMessage();
LOG_E("ResponseProccessor::Handle(TEvHttpIncomingResponse): " << msg);
Issues.AddIssue(msg);
}

LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): progress: "
<< DatabaseId2Description.size() << " of " << Requests.size() << " requests are done");

if (HandledIds == Requests.size()) {
SendResolvedEndpointsAndDie(errorMessage);
SendResolvedEndpointsAndDie();
}
}

Expand All @@ -160,18 +154,25 @@ class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>
void HandleResponse(
NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr& ev,
const TRequestMap::const_iterator& requestIter,
TString& errorMessage,
TMaybe<TDatabaseDescription>& result)
{
if (ev->Get()->Error.empty() && (ev->Get()->Response && ev->Get()->Response->Status == "200")) {
errorMessage = HandleSuccessfulResponse(ev, requestIter, result);
{
TString errorMessage;

if (requestIter == Requests.end()) {
// Requests are guaranteed to be kept in within TResponseProcessor until the response arrives.
// If there is no appropriate request, it's a fatal error.
errorMessage = "Invariant violation: unknown request";
} else {
errorMessage = HandleFailedResponse(ev, requestIter);
if (ev->Get()->Error.empty() && (ev->Get()->Response && ev->Get()->Response->Status == "200")) {
errorMessage = HandleSuccessfulResponse(ev, *requestIter, result);
} else {
errorMessage = HandleFailedResponse(ev, *requestIter);
}
}

if (errorMessage) {
Issues.AddIssue(errorMessage);
LOG_E("ResponseProcessor::Handle(HttpIncomingResponse): error=" << errorMessage);
Success = false;
} else {
const auto& params = requestIter->second;
auto key = std::make_tuple(params.Id, params.DatabaseType, params.DatabaseAuth);
Expand All @@ -191,17 +192,13 @@ class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>

TString HandleSuccessfulResponse(
NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr& ev,
const TRequestMap::const_iterator& requestIter,
const TRequestMap::value_type& requestWithParams,
TMaybe<TDatabaseDescription>& result
) {
if (requestIter == Requests.end()) {
return "unknown request";
}

NJson::TJsonReaderConfig jsonConfig;
NJson::TJsonValue databaseInfo;

const auto& params = requestIter->second;
const auto& params = requestWithParams.second;
const bool parseJsonOk = NJson::ReadJsonTree(ev->Get()->Response->Body, &jsonConfig, &databaseInfo);
TParsers::const_iterator parserIt;
if (parseJsonOk && (parserIt = Parsers.find(params.DatabaseType)) != Parsers.end()) {
Expand Down Expand Up @@ -232,37 +229,37 @@ class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>

TString HandleFailedResponse(
NHttp::TEvHttpProxy::TEvHttpIncomingResponse::TPtr& ev,
const TRequestMap::const_iterator& requestIter
const TRequestMap::value_type& requestWithParams
) const {
if (requestIter == Requests.end()) {
return "unknown request";
}
auto sb = TStringBuilder()
<< "Error while trying to resolve managed " << ToString(requestWithParams.second.DatabaseType)
<< " database with id " << requestWithParams.second.Id << " via HTTP request to"
<< ": endpoint '" << requestWithParams.first->Host << "'"
<< ", url '" << requestWithParams.first->URL << "'"
<< ": ";

// Handle network error (when the response is empty)
if (!ev->Get()->Response) {
return sb << ev->Get()->Error;
}

// Handle unauthenticated error
const auto& status = ev->Get()->Response->Status;

if (status == "403") {
return TStringBuilder() << "You have no permission to resolve database id into database endpoint. " + DetailedPermissionsError(requestIter->second);
return sb << "you have no permission to resolve database id into database endpoint." + DetailedPermissionsError(requestWithParams.second);
}

auto errorMessage = ev->Get()->Error;

const TString error = TStringBuilder()
<< "Cannot resolve database id (status = " << status << "). "
<< "Response body from " << ev->Get()->Request->URL << ": " << (ev->Get()->Response ? ev->Get()->Response->Body : "empty");
if (!errorMessage.empty()) {
errorMessage += '\n';
}
errorMessage += error;

return errorMessage;
// Unexpected error. Add response body for debug
return sb << Endl
<< "Status: " << status << Endl
<< "Response body: " << ev->Get()->Response->Body;
}


TString DetailedPermissionsError(const TResolveParams& params) const {

if (params.DatabaseType == EDatabaseType::ClickHouse || params.DatabaseType == EDatabaseType::PostgreSQL) {
auto mdbTypeStr = NYql::DatabaseTypeLowercase(params.DatabaseType);
return TStringBuilder() << "Please check that your service account has role " <<
return TStringBuilder() << " Please check that your service account has role " <<
"`managed-" << mdbTypeStr << ".viewer`.";
}
return {};
Expand All @@ -275,7 +272,7 @@ class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>
const NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator;
TDatabaseResolverResponse::TDatabaseDescriptionMap DatabaseId2Description;
size_t HandledIds = 0;
bool Success = true;
NYql::TIssues Issues;
const TParsers& Parsers;
TDuration ResolvingTtl = TDuration::Seconds(30); //TODO: Use cfg
};
Expand Down Expand Up @@ -312,7 +309,12 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
}

Y_ENSURE(endpoint);
return TDatabaseDescription{endpoint, "", 0, database, secure};

TVector<TString> split = StringSplitter(endpoint).Split(':');

Y_ENSURE(split.size() == 2);

return TDatabaseDescription{endpoint, split[0], FromString(split[1]), database, secure};
};
Parsers[NYql::EDatabaseType::Ydb] = ydbParser;
Parsers[NYql::EDatabaseType::DataStreams] = [ydbParser](
Expand All @@ -327,9 +329,11 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
if (!isDedicatedDb && ret.Endpoint.StartsWith("ydb.")) {
// Replace "ydb." -> "yds."
ret.Endpoint[2] = 's';
ret.Host[2] = 's';
}
if (isDedicatedDb) {
ret.Endpoint = "u-" + ret.Endpoint;
ret.Host = "u-" + ret.Host;
}
return ret;
};
Expand Down Expand Up @@ -486,6 +490,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
try {
TString url;
if (IsIn({NYql::EDatabaseType::Ydb, NYql::EDatabaseType::DataStreams }, databaseType)) {
YQL_ENSURE(ev->Get()->YdbMvpEndpoint.Size() > 0, "empty YDB MVP Endpoint");
url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database")
.AddUrlParam("databaseId", databaseId)
.Build();
Expand All @@ -497,7 +502,6 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
.AddPathComponent("hosts")
.Build();
}
LOG_D("ResponseProccessor::Handle(EndpointRequest): start GET request: " << url);

NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet(url);

Expand All @@ -507,6 +511,8 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
httpRequest->Set("Authorization", token);
}

LOG_D("ResponseProccessor::Handle(EndpointRequest): start GET request: " << "url: " << httpRequest->URL);

requests[httpRequest] = TResolveParams{databaseId, databaseType, databaseAuth};
} catch (const std::exception& e) {
const TString msg = TStringBuilder() << "error while preparing to resolve database id: " << databaseId
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/fq/libs/actors/proxy_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ namespace NKikimr {

namespace NFq {

NActors::TActorId MakeYqPrivateProxyId();

NActors::IActor* CreateYqlAnalyticsPrivateProxy(
const NConfig::TPrivateProxyConfig& privateProxyConfig,
TIntrusivePtr<ITimeProvider> timeProvider,
Expand Down
7 changes: 1 addition & 6 deletions ydb/core/fq/libs/actors/run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
#include <ydb/library/yql/providers/pq/task_meta/task_meta.h>
#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
#include <ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h>
#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>
#include <ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h>
#include <ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h>
Expand Down Expand Up @@ -1940,11 +1939,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
}

{
dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.YqSharedResources->UserSpaceYdbDriver, Params.CredentialsFactory, dbResolver));
}

{
dataProvidersInit.push_back(GetGenericDataProviderInitializer(Params.ConnectorClient, dbResolver));
dataProvidersInit.push_back(GetGenericDataProviderInitializer(Params.ConnectorClient, dbResolver, Params.CredentialsFactory));
}

{
Expand Down
Loading

0 comments on commit 04e41f0

Please sign in to comment.