Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/external_sources/external_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TStri
{
ToString(NYql::EDatabaseType::PostgreSQL),
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"MDB_BASIC", "BASIC"}, {"database_name", "protocol", "mdb_cluster_id", "use_tls", "schema"}, hostnamePatternsRegEx)
},
{
ToString(NYql::EDatabaseType::Ydb),
CreateExternalDataSource(TString{NYql::GenericProviderName}, {"BASIC", "SERVICE_ACCOUNT"}, {"database_name", "use_tls", "database_id"}, hostnamePatternsRegEx)
}
});
}
Expand Down
15 changes: 6 additions & 9 deletions ydb/core/fq/libs/actors/clusters_from_connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,17 +220,14 @@ void AddClustersFromConnections(
switch (conn.content().setting().connection_case()) {
case FederatedQuery::ConnectionSetting::kYdbDatabase: {
const auto& db = conn.content().setting().ydb_database();
auto* clusterCfg = gatewaysConfig.MutableYdb()->AddClusterMapping();
auto* clusterCfg = gatewaysConfig.MutableGeneric()->AddClusterMapping();
clusterCfg->SetKind(NYql::NConnector::NApi::EDataSourceKind::YDB);
clusterCfg->SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE);
clusterCfg->SetName(connectionName);
clusterCfg->SetId(db.database_id());
if (db.database())
clusterCfg->SetDatabase(db.database());
if (db.endpoint())
clusterCfg->SetEndpoint(db.endpoint());
clusterCfg->SetSecure(db.secure());
clusterCfg->SetAddBearerToToken(common.GetUseBearerForYdb());
clusterCfg->SetDatabaseId(db.database_id());
clusterCfg->SetUseSsl(!common.GetDisableSslForGenericDataSources());
FillClusterAuth(*clusterCfg, db.auth(), authToken, accountIdSignatures);
clusters.emplace(connectionName, YdbProviderName);
clusters.emplace(connectionName, GenericProviderName);
break;
}
case FederatedQuery::ConnectionSetting::kClickhouseCluster: {
Expand Down
16 changes: 13 additions & 3 deletions ydb/core/fq/libs/actors/database_resolver.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "database_resolver.h"

#include <util/string/split.h>
#include <ydb/core/fq/libs/common/cache.h>
#include <ydb/core/fq/libs/config/protos/issue_id.pb.h>
#include <ydb/core/fq/libs/events/events.h>
Expand Down Expand Up @@ -128,7 +129,7 @@ class TResponseProcessor : public TActorBootstrapped<TResponseProcessor>
const auto requestIter = Requests.find(ev->Get()->Request);
HandledIds++;

LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): got MDB API response: code=" << ev->Get()->Response->Status);
LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): got API response: code=" << ev->Get()->Response->Status);

try {
HandleResponse(ev, requestIter, result);
Expand Down Expand Up @@ -305,7 +306,12 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
}

Y_ENSURE(endpoint);
return TDatabaseDescription{endpoint, "", 0, database, secure};

TVector<TString> split = StringSplitter(endpoint).Split(':');

Y_ENSURE(split.size() == 2);

return TDatabaseDescription{endpoint, split[0], FromString(split[1]), database, secure};
};
Parsers[NYql::EDatabaseType::Ydb] = ydbParser;
Parsers[NYql::EDatabaseType::DataStreams] = [ydbParser](
Expand All @@ -320,9 +326,11 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
if (!isDedicatedDb && ret.Endpoint.StartsWith("ydb.")) {
// Replace "ydb." -> "yds."
ret.Endpoint[2] = 's';
ret.Host[2] = 's';
}
if (isDedicatedDb) {
ret.Endpoint = "u-" + ret.Endpoint;
ret.Host = "u-" + ret.Host;
}
return ret;
};
Expand Down Expand Up @@ -479,6 +487,7 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
try {
TString url;
if (IsIn({NYql::EDatabaseType::Ydb, NYql::EDatabaseType::DataStreams }, databaseType)) {
YQL_ENSURE(ev->Get()->YdbMvpEndpoint.Size() > 0, "empty YDB MVP Endpoint");
url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database")
.AddUrlParam("databaseId", databaseId)
.Build();
Expand All @@ -490,7 +499,6 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
.AddPathComponent("hosts")
.Build();
}
LOG_D("ResponseProccessor::Handle(EndpointRequest): start GET request: " << url);

NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet(url);

Expand All @@ -500,6 +508,8 @@ class TDatabaseResolver: public TActor<TDatabaseResolver>
httpRequest->Set("Authorization", token);
}

LOG_D("ResponseProccessor::Handle(EndpointRequest): start GET request: " << "url: " << httpRequest->URL);

requests[httpRequest] = TResolveParams{databaseId, databaseType, databaseAuth};
} catch (const std::exception& e) {
const TString msg = TStringBuilder() << "error while preparing to resolve database id: " << databaseId
Expand Down
7 changes: 1 addition & 6 deletions ydb/core/fq/libs/actors/run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
#include <ydb/library/yql/providers/pq/task_meta/task_meta.h>
#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
#include <ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h>
#include <ydb/library/yql/providers/solomon/gateway/yql_solomon_gateway.h>
#include <ydb/library/yql/providers/solomon/provider/yql_solomon_provider.h>
#include <ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h>
Expand Down Expand Up @@ -1940,11 +1939,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
}

{
dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.YqSharedResources->UserSpaceYdbDriver, Params.CredentialsFactory, dbResolver));
}

{
dataProvidersInit.push_back(GetGenericDataProviderInitializer(Params.ConnectorClient, dbResolver));
dataProvidersInit.push_back(GetGenericDataProviderInitializer(Params.ConnectorClient, dbResolver, Params.CredentialsFactory));
}

{
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{"ydb.serverless.yandexcloud.net:2135"},
TString{""},
0,
TString{"ydb.serverless.yandexcloud.net"},
2135,
TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"),
true
},
Expand All @@ -195,8 +195,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{"yds.serverless.yandexcloud.net:2135"},
TString{""},
0,
TString{"yds.serverless.yandexcloud.net"},
2135,
TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"),
true
},
Expand All @@ -217,8 +217,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) {
})",
NYql::TDatabaseResolverResponse::TDatabaseDescription{
TString{"u-lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net:2135"},
TString{""},
0,
TString{"u-lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net"},
2135,
TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"),
true
},
Expand Down
1 change: 0 additions & 1 deletion ydb/core/fq/libs/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ PEERDIR(
ydb/library/yql/providers/pq/provider
ydb/library/yql/providers/pq/task_meta
ydb/library/yql/providers/s3/provider
ydb/library/yql/providers/ydb/provider
ydb/library/yql/public/issue
ydb/library/yql/public/issue/protos
ydb/library/yql/sql/settings
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/fq/libs/compute/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ class TComputeConfig {
case FederatedQuery::ConnectionSetting::kObjectStorage:
case FederatedQuery::ConnectionSetting::kClickhouseCluster:
case FederatedQuery::ConnectionSetting::kPostgresqlCluster:
return true;
case FederatedQuery::ConnectionSetting::kYdbDatabase:
return true;
case FederatedQuery::ConnectionSetting::kDataStreams:
case FederatedQuery::ConnectionSetting::kMonitoring:
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,14 @@ TString MakeCreateExternalDataSourceQuery(
switch (connectionContent.setting().connection_case()) {
case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET:
case FederatedQuery::ConnectionSetting::kYdbDatabase:
properties = fmt::format(
R"(
SOURCE_TYPE="Ydb",
DATABASE_ID={database_id},
USE_TLS="{use_tls}"
)",
"database_id"_a = EncloseAndEscapeString(connectionContent.setting().ydb_database().database_id(), '"'),
"use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true");
break;
case FederatedQuery::ConnectionSetting::kClickhouseCluster:
properties = fmt::format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ FederatedQuery::IamAuth::IdentityCase GetIamAuth(const FederatedQuery::Connectio
const auto& setting = connection.content().setting();
switch (setting.connection_case()) {
case FederatedQuery::ConnectionSetting::kYdbDatabase:
return setting.data_streams().auth().identity_case();
return setting.ydb_database().auth().identity_case();
case FederatedQuery::ConnectionSetting::kClickhouseCluster:
return setting.clickhouse_cluster().auth().identity_case();
case FederatedQuery::ConnectionSetting::kObjectStorage:
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,10 @@ void Init(
&protoConfig.GetGateways().GetHttpGateway(),
yqCounters->GetSubgroup("subcomponent", "http_gateway"));

const auto connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric().GetConnector());
NYql::NConnector::IClient::TPtr connectorClient = nullptr;
if (protoConfig.GetGateways().GetGeneric().HasConnector()) {
connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric().GetConnector());
}

if (protoConfig.GetTokenAccessor().GetEnabled()) {
const auto& tokenAccessorConfig = protoConfig.GetTokenAccessor();
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,12 @@ namespace NKikimr::NKqp {
GenericGatewaysConfig};

// Init DatabaseAsyncResolver only if all requirements are met
if (DatabaseResolverActorId && GenericGatewaysConfig.HasMdbGateway() && MdbEndpointGenerator) {
if (DatabaseResolverActorId && MdbEndpointGenerator &&
(GenericGatewaysConfig.HasMdbGateway() || GenericGatewaysConfig.HasYdbMvpEndpoint())) {
result.DatabaseAsyncResolver = std::make_shared<NFq::TDatabaseAsyncResolverImpl>(
actorSystem,
DatabaseResolverActorId.value(),
"", // TODO: use YDB Gateway endpoint?
GenericGatewaysConfig.GetYdbMvpEndpoint(),
GenericGatewaysConfig.GetMdbGateway(),
MdbEndpointGenerator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,12 @@ void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescri
}

static const TSet<TString> properties {
"database_name",
"protocol",
"mdb_cluster_id",
"database_name",
"protocol", // managed PG, CH
"mdb_cluster_id", // managed PG, CH
"database_id", // managed YDB
"use_tls",
"schema"
"schema", // managed PG
};

for (const auto& property: properties) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,7 @@ class TKqpHost : public IKqpHost {
TypesCtx.Get(),
FuncRegistry,
FederatedQuerySetup->DatabaseAsyncResolver,
FederatedQuerySetup->CredentialsFactory,
FederatedQuerySetup->ConnectorClient,
FederatedQuerySetup->GenericGatewayConfig
);
Expand Down
Loading