From 5280a23f8373636f634e24a775bbf5ec9ba43fd3 Mon Sep 17 00:00:00 2001 From: Amelin Nikita Date: Thu, 25 Jan 2024 14:08:13 +0300 Subject: [PATCH 1/7] YQ-2715 add ydb data source (#753) --- .../external_source_factory.cpp | 4 +++ .../generic_ut/kqp_generic_provider_ut.cpp | 21 +++++++++++ .../db_id_async_resolver/db_async_resolver.h | 4 +++ .../actors/yql_generic_source_factory.cpp | 2 +- .../ut_helpers/connector_client_mock.cpp | 36 +++++++++++++++++++ .../libcpp/ut_helpers/connector_client_mock.h | 33 +++++++++++++++++ .../connector/libcpp/ut_helpers/defaults.cpp | 4 +++ .../connector/libcpp/ut_helpers/defaults.h | 6 ++++ .../provider/yql_generic_cluster_config.cpp | 5 +++ .../provider/yql_generic_dq_integration.cpp | 6 ++++ .../provider/yql_generic_load_meta.cpp | 4 ++- 11 files changed, 123 insertions(+), 2 deletions(-) diff --git a/ydb/core/external_sources/external_source_factory.cpp b/ydb/core/external_sources/external_source_factory.cpp index cd04a3530aec..3db5257f884a 100644 --- a/ydb/core/external_sources/external_source_factory.cpp +++ b/ydb/core/external_sources/external_source_factory.cpp @@ -46,6 +46,10 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector().GetResult(); case EProviderType::ClickHouse: return TConnectorClientMock::TClickHouseDataSourceInstanceBuilder<>().GetResult(); + case EProviderType::Ydb: + return TConnectorClientMock::TYdbDataSourceInstanceBuilder<>().GetResult(); } } @@ -48,6 +51,8 @@ namespace NKikimr::NKqp { return CreatePostgreSQLExternalDataSource(kikimr); case EProviderType::ClickHouse: return CreateClickHouseExternalDataSource(kikimr); + case EProviderType::Ydb: + return CreateYdbExternalDataSource(kikimr); } } @@ -165,6 +170,10 @@ namespace NKikimr::NKqp { TestSelectAllFields(EProviderType::ClickHouse); } + Y_UNIT_TEST(YdbManaged) { + TestSelectAllFields(EProviderType::Ydb); + } + void TestSelectConstant(EProviderType providerType) { // prepare mock auto clientMock = std::make_shared(); @@ -257,6 +266,10 @@ namespace NKikimr::NKqp { TestSelectConstant(EProviderType::ClickHouse); } + Y_UNIT_TEST(YdbManagedSelectConstant) { + TestSelectConstant(EProviderType::Ydb); + } + void TestSelectCount(EProviderType providerType) { // prepare mock auto clientMock = std::make_shared(); @@ -345,6 +358,10 @@ namespace NKikimr::NKqp { TestSelectCount(EProviderType::ClickHouse); } + Y_UNIT_TEST(YdbSelectCount) { + TestSelectCount(EProviderType::Ydb); + } + void TestFilterPushdown(EProviderType providerType) { // prepare mock auto clientMock = std::make_shared(); @@ -450,5 +467,9 @@ namespace NKikimr::NKqp { Y_UNIT_TEST(ClickHouseFilterPushdown) { TestFilterPushdown(EProviderType::ClickHouse); } + + Y_UNIT_TEST(YdbFilterPushdown) { + TestFilterPushdown(EProviderType::Ydb); + } } } diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h index b5fcf35bc8e2..eef520630710 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h +++ b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h @@ -22,6 +22,8 @@ inline EDatabaseType DatabaseTypeFromDataSourceKind(NConnector::NApi::EDataSourc return EDatabaseType::PostgreSQL; case NConnector::NApi::EDataSourceKind::CLICKHOUSE: return EDatabaseType::ClickHouse; + case NConnector::NApi::EDataSourceKind::YDB: + return EDatabaseType::Ydb; default: ythrow yexception() << "Unknown data source kind: " << NConnector::NApi::EDataSourceKind_Name(dataSourceKind); } @@ -33,6 +35,8 @@ inline NConnector::NApi::EDataSourceKind DatabaseTypeToDataSourceKind(EDatabaseT return NConnector::NApi::EDataSourceKind::POSTGRESQL; case EDatabaseType::ClickHouse: return NConnector::NApi::EDataSourceKind::CLICKHOUSE; + case EDatabaseType::Ydb: + return NConnector::NApi::EDataSourceKind::YDB; default: ythrow yexception() << "Unknown database type: " << ToString(databaseType); } diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp index b35709877445..3f0cce4a6dc9 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp @@ -16,7 +16,7 @@ namespace NYql::NDq { args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, args.HolderFactory); }; - for (auto& sourceName : {"ClickHouseGeneric", "PostgreSqlGeneric"}) { + for (auto& sourceName : {"ClickHouseGeneric", "PostgreSqlGeneric", "YdbGeneric"}) { factory.RegisterSource(sourceName, genericFactory); } } diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp index 172194397add..f294f08ce4ef 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp @@ -107,6 +107,42 @@ namespace NYql::NConnector::NTest { UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); } + void CreateYdbExternalDataSource( + const std::shared_ptr& kikimr, + const TString& dataSourceName, + const TString& login, + const TString& password, + const TString& endpoint, + bool useTls, + const TString& databaseName) + { + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format( + R"( + CREATE OBJECT {data_source_name}_password (TYPE SECRET) WITH (value={password}); + + CREATE EXTERNAL DATA SOURCE {data_source_name} WITH ( + SOURCE_TYPE="{source_type}", + LOCATION="{endpoint}", + AUTH_METHOD="BASIC", + LOGIN="{login}", + DATABASE_NAME="{database}", + PASSWORD_SECRET_NAME="{data_source_name}_password", + USE_TLS="{use_tls}" + ); + )", + "data_source_name"_a = dataSourceName, + "login"_a = login, + "password"_a = password, + "use_tls"_a = useTls ? "TRUE" : "FALSE", + "source_type"_a = ToString(NYql::EDatabaseType::Ydb), + "endpoint"_a = endpoint, + "database"_a = databaseName); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + std::shared_ptr MakeEmptyRecordBatch(size_t rowsCount) { return arrow::RecordBatch::Make( std::make_shared(arrow::FieldVector()), diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h index f26a8e4728ad..6f09b9e5ffab 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h @@ -54,6 +54,11 @@ namespace NYql::NConnector::NTest { return TClickHouseDataSourceInstanceBuilder( \ this->Result_->mutable_data_source_instance(), \ static_cast(this)); \ + } \ + TYdbDataSourceInstanceBuilder YdbDataSourceInstance() { \ + return TYdbDataSourceInstanceBuilder( \ + this->Result_->mutable_data_source_instance(), \ + static_cast(this)); \ } MATCHER_P(ProtobufRequestMatcher, expected, "request does not match") { @@ -200,6 +205,15 @@ namespace NYql::NConnector::NTest { const TString& serviceAccountIdSignature = DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE, const TString& databaseName = DEFAULT_DATABASE); + void CreateYdbExternalDataSource( + const std::shared_ptr& kikimr, + const TString& dataSourceName = DEFAULT_DATA_SOURCE_NAME, + const TString& login = DEFAULT_LOGIN, + const TString& password = DEFAULT_PASSWORD, + const TString& endpoint = DEFAULT_YDB_ENDPOINT, + bool useTls = DEFAULT_USE_TLS, + const TString& databaseName = DEFAULT_DATABASE); + class TConnectorClientMock: public NYql::NConnector::IClient { public: MOCK_METHOD(TResult, DescribeTableImpl, (const NApi::TDescribeTableRequest& request)); @@ -277,6 +291,25 @@ namespace NYql::NConnector::NTest { } }; + template + struct TYdbDataSourceInstanceBuilder: public TBaseDataSourceInstanceBuilder, TParent> { + using TBase = TBaseDataSourceInstanceBuilder, TParent>; + + explicit TYdbDataSourceInstanceBuilder(NApi::TDataSourceInstance* result = nullptr, TParent* parent = nullptr) + : TBase(result, parent) + { + FillWithDefaults(); + } + + void FillWithDefaults() { + TBase::FillWithDefaults(); + this->Host(DEFAULT_YDB_HOST); + this->Port(DEFAULT_YDB_PORT); + this->Kind(NApi::EDataSourceKind::YDB); + this->Protocol(DEFAULT_YDB_PROTOCOL); + } + }; + template struct TDescribeTableResultBuilder: public TResponseBuilder { using TBuilder = TDescribeTableResultBuilder; diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp index b16aa541b43e..09fbd4f8c599 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp @@ -16,4 +16,8 @@ namespace NYql::NConnector::NTest { extern const TString DEFAULT_CH_CLUSTER_ID = "ch-managed"; extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID = "sa"; extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE = "sa_signature"; + + extern const TString DEFAULT_YDB_HOST = "localhost"; + extern const TString DEFAULT_YDB_DATABASE = "local"; + extern const TString DEFAULT_YDB_ENDPOINT = TStringBuilder() << DEFAULT_YDB_HOST << ':' << DEFAULT_YDB_PORT; } // namespace NYql::NConnector::NTest diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h index f8316f7e9824..f5e28b3e9f37 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h @@ -25,4 +25,10 @@ namespace NYql::NConnector::NTest { constexpr NApi::EProtocol DEFAULT_CH_PROTOCOL = NApi::EProtocol::HTTP; extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID; extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE; + + extern const TString DEFAULT_YDB_DATABASE; + extern const TString DEFAULT_YDB_HOST; + constexpr int DEFAULT_YDB_PORT = 2136; + extern const TString DEFAULT_YDB_ENDPOINT; + constexpr NApi::EProtocol DEFAULT_YDB_PROTOCOL = NApi::EProtocol::NATIVE; } // namespace NYql::NConnector::NTest diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp index 900c2e37f93a..249e3e1002ff 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp @@ -172,6 +172,11 @@ namespace NYql { NYql::TGenericClusterConfig& clusterConfig) { using namespace NConnector::NApi; + if (clusterConfig.GetKind() == EDataSourceKind::YDB) { + clusterConfig.SetProtocol(EProtocol::NATIVE); + return; + } + auto it = properties.find("protocol"); if (it == properties.cend()) { ythrow yexception() << "missing 'PROTOCOL' value"; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp index 73ceaa500c4d..37a52fdeac1b 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp @@ -162,6 +162,9 @@ namespace NYql { case NYql::NConnector::NApi::POSTGRESQL: sourceType = "PostgreSqlGeneric"; break; + case NYql::NConnector::NApi::YDB: + sourceType = "YdbGeneric"; + break; default: ythrow yexception() << "Data source kind is unknown or not specified"; break; @@ -193,6 +196,9 @@ namespace NYql { case NConnector::NApi::POSTGRESQL: properties["SourceType"] = "PostgreSql"; break; + case NConnector::NApi::YDB: + properties["SourceType"] = "Ydb"; + break; case NConnector::NApi::DATA_SOURCE_KIND_UNSPECIFIED: break; default: diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp index 8befd4fd7cf4..929884754b96 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp @@ -269,6 +269,8 @@ namespace NYql { switch (dataSourceKind) { case NYql::NConnector::NApi::CLICKHOUSE: break; + case NYql::NConnector::NApi::YDB: + break; case NYql::NConnector::NApi::POSTGRESQL: { // for backward compability set schema "public" by default // TODO: simplify during https://st.yandex-team.ru/YQ-2494 @@ -324,7 +326,7 @@ namespace NYql { dbNameTarget = "postgres"; break; default: - ythrow yexception() << "Unexpected data source kind: '" + ythrow yexception() << "You must provide database name explicitly for data source kind: '" << NYql::NConnector::NApi::EDataSourceKind_Name(dataSourceKind) << "'"; } } // else take database name from table path From 634bfbb0c054507e27cd8c7c6032e210635d871c Mon Sep 17 00:00:00 2001 From: Amelin Nikita Date: Mon, 29 Jan 2024 13:55:06 +0300 Subject: [PATCH 2/7] YQ-2708 add IAM credentials to protobuf (#1340) --- .../generic/connector/api/common/data_source.proto | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ydb/library/yql/providers/generic/connector/api/common/data_source.proto b/ydb/library/yql/providers/generic/connector/api/common/data_source.proto index b29eb7e16a08..ecfb64665c9e 100644 --- a/ydb/library/yql/providers/generic/connector/api/common/data_source.proto +++ b/ydb/library/yql/providers/generic/connector/api/common/data_source.proto @@ -13,8 +13,14 @@ message TCredentials { string password = 2; } + message TToken { + string type = 1; + string value = 2; + } + oneof payload { TBasic basic = 1; + TToken token = 2; } } From 2741eb281c0aedf8488c3aa56a3e45534377012d Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Tue, 20 Feb 2024 18:59:55 +0300 Subject: [PATCH 3/7] Add `SslCaCrt` field to `TGenericConnectorConfig` (#2102) --- .../providers/common/proto/gateways_config.proto | 6 ++++-- .../providers/generic/connector/libcpp/client.cpp | 14 ++++++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index fb845b8cee7b..db2781f6e13f 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -592,9 +592,11 @@ message TGenericClusterConfig { message TGenericConnectorConfig { // Connector instance network endpoint optional NYql.NConnector.NApi.TEndpoint Endpoint = 3; - // If true, GRPC Client will use TLS encryption. - // Server cert will be verified with system CA cert pool. + // If true, Connector GRPC Client will use TLS encryption. optional bool UseSsl = 4; + // Path to the custom CA certificate to verify Connector's certs. + // If empty, the default system CA certificate pool will be used. + optional string SslCaCrt = 5; reserved 1, 2; } diff --git a/ydb/library/yql/providers/generic/connector/libcpp/client.cpp b/ydb/library/yql/providers/generic/connector/libcpp/client.cpp index 9d6237808377..e0fe558b7ae1 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/client.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/client.cpp @@ -1,3 +1,5 @@ +#include + #include "client.h" namespace NYql::NConnector { @@ -21,10 +23,18 @@ namespace NYql::NConnector { public: TClientGRPC() = delete; TClientGRPC(const TGenericConnectorConfig& config) { - TString endpoint = TStringBuilder() << config.GetEndpoint().host() << ":" << ToString(config.GetEndpoint().port()); - GrpcConfig_ = NYdbGrpc::TGRpcClientConfig(endpoint); + GrpcConfig_ = NYdbGrpc::TGRpcClientConfig(); + GrpcConfig_.Locator = TStringBuilder() << config.GetEndpoint().host() << ":" << config.GetEndpoint().port(); GrpcConfig_.EnableSsl = config.GetUseSsl(); + // Read content of CA cert + TString rootCertData; + if (config.GetSslCaCrt()) { + rootCertData = TFileInput(config.GetSslCaCrt()).ReadAll(); + } + + GrpcConfig_.SslCredentials = grpc::SslCredentialsOptions{.pem_root_certs = rootCertData, .pem_private_key = "", .pem_cert_chain = ""}; + GrpcClient_ = std::make_unique(); // FIXME: is it OK to use single connection during the client lifetime? From df27a77c63944234bba79d2bf074ed594d8ce0e0 Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Fri, 1 Mar 2024 16:47:10 +0300 Subject: [PATCH 4/7] Support YDB in YQL Generic Provider (YQv1) (#2300) * Support YDB in YQL Generic Provider (for YQv1 only) * Add `--token-accessor-endpoint` flag to dqrun * Drop some outdated tests --- .../libs/actors/clusters_from_connections.cpp | 15 +- ydb/core/fq/libs/actors/database_resolver.cpp | 16 +- ydb/core/fq/libs/actors/run_actor.cpp | 7 +- .../libs/actors/ut/database_resolver_ut.cpp | 12 +- ydb/core/fq/libs/actors/ya.make | 1 - ydb/core/fq/libs/init/init.cpp | 5 +- ydb/core/kqp/host/kqp_host.cpp | 1 + .../generic_ut/kqp_generic_provider_ut.cpp | 4 - .../common/proto/gateways_config.proto | 3 +- .../yql/providers/generic/actors/ya.make | 2 + .../generic/actors/yql_generic_read_actor.cpp | 72 ++++++-- .../generic/actors/yql_generic_read_actor.h | 2 +- .../actors/yql_generic_source_factory.cpp | 4 +- .../api/service/protos/connector.proto | 6 +- .../generic/connector/libcpp/client.cpp | 4 + .../generic/connector/libcpp/client.h | 2 - .../yql/providers/generic/proto/source.proto | 18 +- .../provider/ut/pushdown/pushdown_ut.cpp | 9 +- .../yql/providers/generic/provider/ya.make | 5 + .../provider/yql_generic_cluster_config.cpp | 22 ++- .../provider/yql_generic_dq_integration.cpp | 21 ++- .../provider/yql_generic_io_discovery.cpp | 17 +- .../provider/yql_generic_load_meta.cpp | 103 +++++++---- .../generic/provider/yql_generic_provider.cpp | 14 +- .../generic/provider/yql_generic_provider.h | 6 +- .../generic/provider/yql_generic_settings.cpp | 20 +-- .../generic/provider/yql_generic_settings.h | 15 +- .../generic/provider/yql_generic_state.h | 18 +- .../generic/provider/yql_generic_utils.cpp | 22 +++ .../generic/provider/yql_generic_utils.h | 8 + ydb/library/yql/tools/dqrun/dqrun.cpp | 39 ++++- ydb/library/yql/tools/dqrun/ya.make | 1 + ydb/services/fq/ut_integration/fq_ut.cpp | 160 ------------------ 33 files changed, 331 insertions(+), 323 deletions(-) create mode 100644 ydb/library/yql/providers/generic/provider/yql_generic_utils.cpp create mode 100644 ydb/library/yql/providers/generic/provider/yql_generic_utils.h diff --git a/ydb/core/fq/libs/actors/clusters_from_connections.cpp b/ydb/core/fq/libs/actors/clusters_from_connections.cpp index fa505591c924..ed656497ff6a 100644 --- a/ydb/core/fq/libs/actors/clusters_from_connections.cpp +++ b/ydb/core/fq/libs/actors/clusters_from_connections.cpp @@ -220,17 +220,14 @@ void AddClustersFromConnections( switch (conn.content().setting().connection_case()) { case FederatedQuery::ConnectionSetting::kYdbDatabase: { const auto& db = conn.content().setting().ydb_database(); - auto* clusterCfg = gatewaysConfig.MutableYdb()->AddClusterMapping(); + auto* clusterCfg = gatewaysConfig.MutableGeneric()->AddClusterMapping(); + clusterCfg->SetKind(NYql::NConnector::NApi::EDataSourceKind::YDB); + clusterCfg->SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE); clusterCfg->SetName(connectionName); - clusterCfg->SetId(db.database_id()); - if (db.database()) - clusterCfg->SetDatabase(db.database()); - if (db.endpoint()) - clusterCfg->SetEndpoint(db.endpoint()); - clusterCfg->SetSecure(db.secure()); - clusterCfg->SetAddBearerToToken(common.GetUseBearerForYdb()); + clusterCfg->SetDatabaseId(db.database_id()); + clusterCfg->SetUseSsl(!common.GetDisableSslForGenericDataSources()); FillClusterAuth(*clusterCfg, db.auth(), authToken, accountIdSignatures); - clusters.emplace(connectionName, YdbProviderName); + clusters.emplace(connectionName, GenericProviderName); break; } case FederatedQuery::ConnectionSetting::kClickhouseCluster: { diff --git a/ydb/core/fq/libs/actors/database_resolver.cpp b/ydb/core/fq/libs/actors/database_resolver.cpp index f0c7e8b16a15..91a0ec4963ea 100644 --- a/ydb/core/fq/libs/actors/database_resolver.cpp +++ b/ydb/core/fq/libs/actors/database_resolver.cpp @@ -1,5 +1,6 @@ #include "database_resolver.h" +#include #include #include #include @@ -128,7 +129,7 @@ class TResponseProcessor : public TActorBootstrapped const auto requestIter = Requests.find(ev->Get()->Request); HandledIds++; - LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): got MDB API response: code=" << ev->Get()->Response->Status); + LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): got API response: code=" << ev->Get()->Response->Status); try { HandleResponse(ev, requestIter, result); @@ -305,7 +306,12 @@ class TDatabaseResolver: public TActor } Y_ENSURE(endpoint); - return TDatabaseDescription{endpoint, "", 0, database, secure}; + + TVector split = StringSplitter(endpoint).Split(':'); + + Y_ENSURE(split.size() == 2); + + return TDatabaseDescription{endpoint, split[0], FromString(split[1]), database, secure}; }; Parsers[NYql::EDatabaseType::Ydb] = ydbParser; Parsers[NYql::EDatabaseType::DataStreams] = [ydbParser]( @@ -320,9 +326,11 @@ class TDatabaseResolver: public TActor if (!isDedicatedDb && ret.Endpoint.StartsWith("ydb.")) { // Replace "ydb." -> "yds." ret.Endpoint[2] = 's'; + ret.Host[2] = 's'; } if (isDedicatedDb) { ret.Endpoint = "u-" + ret.Endpoint; + ret.Host = "u-" + ret.Host; } return ret; }; @@ -479,6 +487,7 @@ class TDatabaseResolver: public TActor try { TString url; if (IsIn({NYql::EDatabaseType::Ydb, NYql::EDatabaseType::DataStreams }, databaseType)) { + YQL_ENSURE(ev->Get()->YdbMvpEndpoint.Size() > 0, "empty YDB MVP Endpoint"); url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database") .AddUrlParam("databaseId", databaseId) .Build(); @@ -490,7 +499,6 @@ class TDatabaseResolver: public TActor .AddPathComponent("hosts") .Build(); } - LOG_D("ResponseProccessor::Handle(EndpointRequest): start GET request: " << url); NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet(url); @@ -500,6 +508,8 @@ class TDatabaseResolver: public TActor httpRequest->Set("Authorization", token); } + LOG_D("ResponseProccessor::Handle(EndpointRequest): start GET request: " << "url: " << httpRequest->URL); + requests[httpRequest] = TResolveParams{databaseId, databaseType, databaseAuth}; } catch (const std::exception& e) { const TString msg = TStringBuilder() << "error while preparing to resolve database id: " << databaseId diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp index 4ead981b5004..80790ccb9f7e 100644 --- a/ydb/core/fq/libs/actors/run_actor.cpp +++ b/ydb/core/fq/libs/actors/run_actor.cpp @@ -30,7 +30,6 @@ #include #include #include -#include #include #include #include @@ -1940,11 +1939,7 @@ class TRunActor : public NActors::TActorBootstrapped { } { - dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.YqSharedResources->UserSpaceYdbDriver, Params.CredentialsFactory, dbResolver)); - } - - { - dataProvidersInit.push_back(GetGenericDataProviderInitializer(Params.ConnectorClient, dbResolver)); + dataProvidersInit.push_back(GetGenericDataProviderInitializer(Params.ConnectorClient, dbResolver, Params.CredentialsFactory)); } { diff --git a/ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp b/ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp index ff9705190512..57a552c7a802 100644 --- a/ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp +++ b/ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp @@ -174,8 +174,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) { })", NYql::TDatabaseResolverResponse::TDatabaseDescription{ TString{"ydb.serverless.yandexcloud.net:2135"}, - TString{""}, - 0, + TString{"ydb.serverless.yandexcloud.net"}, + 2135, TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"), true }, @@ -195,8 +195,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) { })", NYql::TDatabaseResolverResponse::TDatabaseDescription{ TString{"yds.serverless.yandexcloud.net:2135"}, - TString{""}, - 0, + TString{"yds.serverless.yandexcloud.net"}, + 2135, TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"), true }, @@ -217,8 +217,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) { })", NYql::TDatabaseResolverResponse::TDatabaseDescription{ TString{"u-lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net:2135"}, - TString{""}, - 0, + TString{"u-lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net"}, + 2135, TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"), true }, diff --git a/ydb/core/fq/libs/actors/ya.make b/ydb/core/fq/libs/actors/ya.make index bcf906c56a51..42d277fbc98c 100644 --- a/ydb/core/fq/libs/actors/ya.make +++ b/ydb/core/fq/libs/actors/ya.make @@ -80,7 +80,6 @@ PEERDIR( ydb/library/yql/providers/pq/provider ydb/library/yql/providers/pq/task_meta ydb/library/yql/providers/s3/provider - ydb/library/yql/providers/ydb/provider ydb/library/yql/public/issue ydb/library/yql/public/issue/protos ydb/library/yql/sql/settings diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index 1dd183dcae6f..c5413c6a698b 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -177,7 +177,10 @@ void Init( &protoConfig.GetGateways().GetHttpGateway(), yqCounters->GetSubgroup("subcomponent", "http_gateway")); - const auto connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric().GetConnector()); + NYql::NConnector::IClient::TPtr connectorClient = nullptr; + if (protoConfig.GetGateways().GetGeneric().HasConnector()) { + connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric().GetConnector()); + } if (protoConfig.GetTokenAccessor().GetEnabled()) { const auto& tokenAccessorConfig = protoConfig.GetTokenAccessor(); diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 1434efffa11b..2a91f095afb2 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1493,6 +1493,7 @@ class TKqpHost : public IKqpHost { TypesCtx.Get(), FuncRegistry, FederatedQuerySetup->DatabaseAsyncResolver, + nullptr, FederatedQuerySetup->ConnectorClient, FederatedQuerySetup->GenericGatewayConfig ); diff --git a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp index 8ec1995b5e53..f176716a1f78 100644 --- a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp @@ -108,7 +108,6 @@ namespace NKikimr::NKqp { // step 3: ReadSplits std::vector colData = {10, 20, 30, 40, 50}; clientMock->ExpectReadSplits() - .DataSourceInstance(dataSourceInstance) .Split() .Description("some binary description") .Select() @@ -208,7 +207,6 @@ namespace NKikimr::NKqp { // step 3: ReadSplits clientMock->ExpectReadSplits() - .DataSourceInstance(dataSourceInstance) .Split() .Description("some binary description") .Select() @@ -304,7 +302,6 @@ namespace NKikimr::NKqp { // step 3: ReadSplits clientMock->ExpectReadSplits() - .DataSourceInstance(dataSourceInstance) .Split() .Description("some binary description") .Select() @@ -413,7 +410,6 @@ namespace NKikimr::NKqp { std::vector filterColumnData = {42, 24}; // clang-format off clientMock->ExpectReadSplits() - .DataSourceInstance(dataSourceInstance) .Split() .Description("some binary description") .Select(select) diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index db2781f6e13f..6659e421284b 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -620,7 +620,8 @@ message TGenericGatewayConfig { /////////////////////////////// Db Resolver /////////////////////////////////// message TDbResolverConfig { - // Ydb / Yds mvp endpoint + // Ydb / Yds MVP endpoint. Expected format: + // [http|https]://host:port/ydbc/cloud-prod/ optional string YdbMvpEndpoint = 2; } diff --git a/ydb/library/yql/providers/generic/actors/ya.make b/ydb/library/yql/providers/generic/actors/ya.make index 31ec4480c9ef..40471d122e07 100644 --- a/ydb/library/yql/providers/generic/actors/ya.make +++ b/ydb/library/yql/providers/generic/actors/ya.make @@ -8,10 +8,12 @@ SRCS( PEERDIR( ydb/library/yql/dq/actors/compute ydb/library/yql/minikql/computation + ydb/library/yql/providers/common/structured_token ydb/library/yql/providers/common/token_accessor/client ydb/library/yql/providers/generic/proto ydb/library/yql/public/types ydb/library/yql/providers/generic/connector/libcpp + ydb/public/sdk/cpp/client/ydb_types/credentials ) YQL_LAST_ABI_VERSION() diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp index 7de4f0a04ea1..2efba5888531 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -16,6 +17,7 @@ #include #include #include +#include namespace NYql::NDq { @@ -102,16 +104,16 @@ namespace NYql::NDq { ui64 inputIndex, TCollectStatsLevel statsLevel, NConnector::IClient::TPtr client, - const NConnector::NApi::TSelect& select, - const NConnector::NApi::TDataSourceInstance& dataSourceInstance, + NYdb::TCredentialsProviderPtr credentialsProvider, + NConnector::TSource&& source, const NActors::TActorId& computeActorId, const NKikimr::NMiniKQL::THolderFactory& holderFactory) : InputIndex_(inputIndex) , ComputeActorId_(computeActorId) , Client_(std::move(client)) + , CredentialsProvider_(std::move(credentialsProvider)) , HolderFactory_(holderFactory) - , Select_(select) - , DataSourceInstance_(dataSourceInstance) + , Source_(source) { IngressStats_.Level = statsLevel; } @@ -143,7 +145,9 @@ namespace NYql::NDq { // Prepare request NConnector::NApi::TListSplitsRequest request; - *request.mutable_selects()->Add() = Select_; + NConnector::NApi::TSelect select = Source_.select(); // copy TSelect from source + MaybeRefreshToken(select.mutable_data_source_instance()); + *request.mutable_selects()->Add() = std::move(select); // Initialize stream Client_->ListSplits(request).Subscribe( @@ -236,8 +240,11 @@ namespace NYql::NDq { std::for_each( Splits_.cbegin(), Splits_.cend(), - [&](const NConnector::NApi::TSplit& split) { request.mutable_splits()->Add()->CopyFrom(split); }); - request.mutable_data_source_instance()->CopyFrom(DataSourceInstance_); + [&](const NConnector::NApi::TSplit& split) { + NConnector::NApi::TSplit splitCopy = split; + MaybeRefreshToken(splitCopy.mutable_select()->mutable_data_source_instance()); + *request.mutable_splits()->Add() = std::move(split); + }); // Start streaming Client_->ReadSplits(request).Subscribe( @@ -403,8 +410,8 @@ namespace NYql::NDq { // It's very important to fill UV columns in the alphabet order, // paying attention to the scalar field containing block length. TVector fieldNames; - std::transform(Select_.what().items().cbegin(), - Select_.what().items().cend(), + std::transform(Source_.select().what().items().cbegin(), + Source_.select().what().items().cend(), std::back_inserter(fieldNames), [](const auto& item) { return item.column().name(); }); @@ -452,6 +459,20 @@ namespace NYql::NDq { return total; } + void MaybeRefreshToken(NConnector::NApi::TDataSourceInstance* dsi) const { + if (!dsi->credentials().has_token()) { + return; + } + + // Token may have expired. Refresh it. + Y_ENSURE(CredentialsProvider_, "CredentialsProvider is not initialized"); + auto iamToken = CredentialsProvider_->GetAuthInfo(); + Y_ENSURE(iamToken, "empty IAM token"); + + *dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken; + *dsi->mutable_credentials()->mutable_token()->mutable_type() = "IAM"; + } + // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor YQL_CLOG(INFO, ProviderGeneric) << "PassAway :: final ingress stats" @@ -484,6 +505,7 @@ namespace NYql::NDq { const NActors::TActorId ComputeActorId_; NConnector::IClient::TPtr Client_; + NYdb::TCredentialsProviderPtr CredentialsProvider_; NConnector::IListSplitsStreamIterator::TPtr ListSplitsIterator_; TVector Splits_; // accumulated list of table splits NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator_; @@ -492,22 +514,21 @@ namespace NYql::NDq { NKikimr::NMiniKQL::TPlainContainerCache ArrowRowContainerCache_; const NKikimr::NMiniKQL::THolderFactory& HolderFactory_; - const NYql::NConnector::NApi::TSelect Select_; - const NYql::NConnector::NApi::TDataSourceInstance DataSourceInstance_; + NConnector::TSource Source_; }; std::pair CreateGenericReadActor(NConnector::IClient::TPtr genericClient, - Generic::TSource&& params, + NConnector::TSource&& source, ui64 inputIndex, TCollectStatsLevel statsLevel, const THashMap& /*secureParams*/, const THashMap& /*taskParams*/, const NActors::TActorId& computeActorId, - ISecuredServiceAccountCredentialsFactory::TPtr /*credentialsFactory*/, + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const NKikimr::NMiniKQL::THolderFactory& holderFactory) { - const auto dsi = params.select().data_source_instance(); + const auto dsi = source.select().data_source_instance(); YQL_CLOG(INFO, ProviderGeneric) << "Creating read actor with params:" << " kind=" << NYql::NConnector::NApi::EDataSourceKind_Name(dsi.kind()) << ", endpoint=" << dsi.endpoint().ShortDebugString() @@ -526,6 +547,25 @@ namespace NYql::NDq { YQL_ENSURE(one != TString::npos && two != TString::npos && one < two, "Bad token format:" << token); */ + // Obtain token to access remote data source if necessary + NYdb::TCredentialsProviderPtr credentialProvider; + if (source.GetServiceAccountId() && source.GetServiceAccountIdSignature()) { + Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized"); + + auto structuredTokenJSON = TStructuredTokenBuilder().SetServiceAccountIdAuth( + source.GetServiceAccountId(), source.GetServiceAccountIdSignature()) + .ToJson(); + + // If service account is provided, obtain IAM-token + Y_ENSURE(structuredTokenJSON, "empty structured token"); + + auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken( + credentialsFactory, + structuredTokenJSON, + false); + credentialProvider = credentialsProviderFactory->CreateProvider(); + } + // TODO: partitioning is not implemented now, but this code will be useful for the further research: /* TStringBuilder part; @@ -543,8 +583,8 @@ namespace NYql::NDq { inputIndex, statsLevel, genericClient, - params.select(), - dsi, + std::move(credentialProvider), + std::move(source), computeActorId, holderFactory); diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.h b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.h index 1bdb050dcd72..6f8b81bb3063 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.h +++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.h @@ -9,7 +9,7 @@ namespace NYql::NDq { std::pair - CreateGenericReadActor(NConnector::IClient::TPtr genericClient, Generic::TSource&& params, ui64 inputIndex, + CreateGenericReadActor(NConnector::IClient::TPtr genericClient, NConnector::TSource&& params, ui64 inputIndex, TCollectStatsLevel statsLevel, const THashMap& secureParams, const THashMap& taskParams, const NActors::TActorId& computeActorId, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp index 3f0cce4a6dc9..3b035bcb5378 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp @@ -10,14 +10,14 @@ namespace NYql::NDq { ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NYql::NConnector::IClient::TPtr genericClient) { auto genericFactory = [credentialsFactory, genericClient]( - Generic::TSource&& settings, + NConnector::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { return CreateGenericReadActor(genericClient, std::move(settings), args.InputIndex, args.StatsLevel, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, args.HolderFactory); }; for (auto& sourceName : {"ClickHouseGeneric", "PostgreSqlGeneric", "YdbGeneric"}) { - factory.RegisterSource(sourceName, genericFactory); + factory.RegisterSource(sourceName, genericFactory); } } diff --git a/ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto b/ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto index 67cd9588547c..7004f2686136 100644 --- a/ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto +++ b/ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto @@ -197,8 +197,10 @@ message TSplit { // ReadDataRequest reads the data associated with a particular table split. message TReadSplitsRequest { - // Data source instance to connect - TDataSourceInstance data_source_instance = 1; + // Data source instance to connect. + // Deprecated field: server implementations must rely on + // TDataSourceInstance provided in each TSelect. + TDataSourceInstance data_source_instance = 1 [deprecated = true]; // Splits that YQ engine would like to read. repeated TSplit splits = 2; diff --git a/ydb/library/yql/providers/generic/connector/libcpp/client.cpp b/ydb/library/yql/providers/generic/connector/libcpp/client.cpp index e0fe558b7ae1..8280a4e36886 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/client.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/client.cpp @@ -24,7 +24,11 @@ namespace NYql::NConnector { TClientGRPC() = delete; TClientGRPC(const TGenericConnectorConfig& config) { GrpcConfig_ = NYdbGrpc::TGRpcClientConfig(); + + Y_ENSURE(config.GetEndpoint().host(), TStringBuilder() << "Empty host in TGenericConnectorConfig: " << config.DebugString()); + Y_ENSURE(config.GetEndpoint().port(), TStringBuilder() << "Empty port in TGenericConnectorConfig: " << config.DebugString()); GrpcConfig_.Locator = TStringBuilder() << config.GetEndpoint().host() << ":" << config.GetEndpoint().port(); + GrpcConfig_.EnableSsl = config.GetUseSsl(); // Read content of CA cert diff --git a/ydb/library/yql/providers/generic/connector/libcpp/client.h b/ydb/library/yql/providers/generic/connector/libcpp/client.h index 1d066e31b72a..7a2250798eb8 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/client.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/client.h @@ -17,8 +17,6 @@ namespace NYql::NConnector { using TAsyncResult = NThreading::TFuture>; using TDescribeTableAsyncResult = TAsyncResult; - // using TListSplitsAsyncResult = TAsyncResult; - // using TReadSplitsAsyncResult = TAsyncResult; template class TStreamer { diff --git a/ydb/library/yql/providers/generic/proto/source.proto b/ydb/library/yql/providers/generic/proto/source.proto index 0911dd54ef26..2ea009080872 100644 --- a/ydb/library/yql/providers/generic/proto/source.proto +++ b/ydb/library/yql/providers/generic/proto/source.proto @@ -2,18 +2,20 @@ syntax = "proto3"; option cc_enable_arenas = true; -package NYql.Generic; +package NYql.NConnector; import "ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto"; import "ydb/library/yql/providers/generic/connector/api/common/data_source.proto"; message TSource { - // Token to access database - // FIXME: unused field, delete it: - string token = 1; // Prepared Select expression NYql.NConnector.NApi.TSelect select = 2; - // Description of instance to connect - // FIXME: DataSourceInstance is already incapsulated into select, delete it: - NYql.NConnector.NApi.TDataSourceInstance data_source_instance = 3; -} \ No newline at end of file + + // ServiceAccountId and ServiceAccountIdSignature are used to obtain tokens + // to access external data source supporting this kind of authentication + // during the runtime phase. + string ServiceAccountId = 4; + string ServiceAccountIdSignature = 5; + + reserved 1, 3; +} diff --git a/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp b/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp index 10d74e5fc6cc..f632e1f2627c 100644 --- a/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp +++ b/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp @@ -144,7 +144,7 @@ struct TFakeGenericClient: public NConnector::IClient { class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase { public: - explicit TBuildDqSourceSettingsTransformer(TTypeAnnotationContext* types, Generic::TSource* dqSourceSettings, bool* dqSourceSettingsWereBuilt) + explicit TBuildDqSourceSettingsTransformer(TTypeAnnotationContext* types, NConnector::TSource* dqSourceSettings, bool* dqSourceSettingsWereBuilt) : TOptimizeTransformerBase(types, NLog::EComponent::ProviderGeneric, {}) , DqSourceSettings_(dqSourceSettings) , DqSourceSettingsWereBuilt_(dqSourceSettingsWereBuilt) @@ -182,13 +182,13 @@ class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase { TString sourceType; dqIntegration->FillSourceSettings(*dqSourceNode, settings, sourceType, 1); UNIT_ASSERT_STRINGS_EQUAL(sourceType, "PostgreSqlGeneric"); - UNIT_ASSERT(settings.Is()); + UNIT_ASSERT(settings.Is()); settings.UnpackTo(DqSourceSettings_); *DqSourceSettingsWereBuilt_ = true; } private: - Generic::TSource* DqSourceSettings_; + NConnector::TSource* DqSourceSettings_; bool* DqSourceSettingsWereBuilt_; }; @@ -207,7 +207,7 @@ struct TPushdownFixture: public NUnitTest::TBaseFixture { TAutoPtr Transformer; TAutoPtr BuildDqSourceSettingsTransformer; - Generic::TSource DqSourceSettings; + NConnector::TSource DqSourceSettings; bool DqSourceSettingsWereBuilt = false; TExprNode::TPtr InitialExprRoot; @@ -243,6 +243,7 @@ struct TPushdownFixture: public NUnitTest::TBaseFixture { TypesCtx.Get(), FunctionRegistry.Get(), DatabaseResolver, + nullptr, GenericClient, GatewaysCfg.GetGeneric()); diff --git a/ydb/library/yql/providers/generic/provider/ya.make b/ydb/library/yql/providers/generic/provider/ya.make index ca9d84e6365b..55dd70b153e2 100644 --- a/ydb/library/yql/providers/generic/provider/ya.make +++ b/ydb/library/yql/providers/generic/provider/ya.make @@ -21,6 +21,8 @@ SRCS( yql_generic_settings.cpp yql_generic_state.h yql_generic_state.cpp + yql_generic_utils.h + yql_generic_utils.cpp ) YQL_LAST_ABI_VERSION() @@ -45,13 +47,16 @@ PEERDIR( ydb/library/yql/providers/common/provider ydb/library/yql/providers/common/pushdown ydb/library/yql/providers/common/structured_token + ydb/library/yql/providers/common/token_accessor/client ydb/library/yql/providers/common/transform ydb/library/yql/providers/dq/common ydb/library/yql/providers/dq/expr_nodes ydb/library/yql/providers/generic/expr_nodes ydb/library/yql/providers/generic/proto + ydb/library/yql/providers/generic/connector/api/common ydb/library/yql/providers/generic/connector/libcpp ydb/library/yql/utils/plan + ydb/public/sdk/cpp/client/ydb_types/credentials ) END() diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp index 249e3e1002ff..e0c9f14a2225 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp @@ -317,7 +317,7 @@ namespace NYql { auto hasEndpoint = clusterConfig.HasEndpoint(); auto databaseId = clusterConfig.GetDatabaseId(); - if ((hasEndpoint && databaseId)) { + if (hasEndpoint && databaseId) { return ValidationError( clusterConfig, context, @@ -357,6 +357,22 @@ namespace NYql { "you must set either ('ServiceAccountId', 'ServiceAccountIdSignature') fields or 'Token' field or none of them"); } + if (clusterConfig.GetKind() == NConnector::NApi::YDB) { + if (clusterConfig.HasDatabaseName() && clusterConfig.HasDatabaseId()) { + return ValidationError( + clusterConfig, + context, + "For YDB clusters you must set either database name or database id, but you have set both of them"); + } + + if (!clusterConfig.HasDatabaseName() && !clusterConfig.HasDatabaseId()) { + return ValidationError( + clusterConfig, + context, + "For YDB clusters you must set either database name or database id, but you have set none of them"); + } + } + // check required fields if (!clusterConfig.GetName()) { return ValidationError(clusterConfig, context, "empty field 'Name'"); @@ -366,10 +382,6 @@ namespace NYql { return ValidationError(clusterConfig, context, "empty field 'Kind'"); } - if (!clusterConfig.GetCredentials().Getbasic().Getusername()) { - return ValidationError(clusterConfig, context, "empty field 'Credentials.basic.username'"); - } - // TODO: validate Credentials.basic.password after ClickHouse recipe fix // TODO: validate DatabaseName field during https://st.yandex-team.ru/YQ-2494 diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp index 37a52fdeac1b..8bee8f7eb75f 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp @@ -99,11 +99,10 @@ namespace NYql { const auto settings = maybeSettings.Cast(); const auto& clusterName = source.DataSource().Cast().Cluster().StringValue(); const auto& table = settings.Table().StringValue(); - const auto& token = settings.Token().Name().StringValue(); - const auto& endpoint = State_->Configuration->ClusterNamesToClusterConfigs[clusterName].endpoint(); + const auto& clusterConfig = State_->Configuration->ClusterNamesToClusterConfigs[clusterName]; + const auto& endpoint = clusterConfig.endpoint(); - Generic::TSource srcDesc; - srcDesc.set_token(token); + NConnector::TSource source; // for backward compability full path can be used (cluster_name.`db_name.table`) // TODO: simplify during https://st.yandex-team.ru/YQ-2494 @@ -126,7 +125,7 @@ namespace NYql { } // prepare select - auto select = srcDesc.mutable_select(); + auto select = source.mutable_select(); select->mutable_from()->set_table(TString(dbTable)); select->mutable_data_source_instance()->CopyFrom(tableMeta.value()->DataSourceInstance); @@ -149,13 +148,17 @@ namespace NYql { } } - // store data source instance - srcDesc.mutable_data_source_instance()->CopyFrom(tableMeta.value()->DataSourceInstance); + // Managed YDB supports access via IAM token. + // Copy service account ids to obtain tokens during request execution phase. + if (clusterConfig.kind() == NConnector::NApi::EDataSourceKind::YDB) { + source.SetServiceAccountId(clusterConfig.GetServiceAccountId()); + source.SetServiceAccountIdSignature(clusterConfig.GetServiceAccountIdSignature()); + } // preserve source description for read actor - protoSettings.PackFrom(srcDesc); + protoSettings.PackFrom(source); - switch (srcDesc.data_source_instance().kind()) { + switch (select->data_source_instance().kind()) { case NYql::NConnector::NApi::CLICKHOUSE: sourceType = "ClickHouseGeneric"; break; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp index 2089adbc798e..ae9e504f3c1b 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp @@ -1,4 +1,6 @@ #include "yql_generic_provider_impl.h" +#include "yql_generic_utils.h" + #include #include #include @@ -91,6 +93,7 @@ namespace NYql { for (const auto& [databaseIdWithType, databaseDescription] : response.DatabaseDescriptionMap) { YQL_CLOG(INFO, ProviderGeneric) << "resolved database id into endpoint" << ": databaseId=" << databaseIdWithType.first + << ", databaseKind=" << databaseIdWithType.second << ", host=" << databaseDescription.Host << ", port=" << databaseDescription.Port; } @@ -157,7 +160,10 @@ namespace NYql { if (clusterConfigIter == clusterNamesToClusterConfigs.end()) { TIssues issues; - issues.AddIssue(TStringBuilder() << "no cluster names for database id " << databaseIdWithType.first << " and cluster name " << clusterName); + issues.AddIssue(TStringBuilder() << "no cluster names for database id " + << databaseIdWithType.first + << " and cluster name " + << clusterName); ctx.IssueManager.AddIssues(issues); return TStatus::Error; } @@ -165,6 +171,15 @@ namespace NYql { auto endpointDst = clusterConfigIter->second.mutable_endpoint(); endpointDst->set_host(databaseDescription.Host); endpointDst->set_port(databaseDescription.Port); + + // If we work with managed YDB, we find out database name + // only after database id (== cluster id) resolving. + if (clusterConfigIter->second.kind() == NConnector::NApi::EDataSourceKind::YDB) { + clusterConfigIter->second.set_databasename(databaseDescription.Database); + } + + YQL_CLOG(INFO, ProviderGeneric) << "ModifyClusterConfigs: " + << DumpGenericClusterConfig(clusterConfigIter->second); } } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp index 929884754b96..329bd634de3b 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp @@ -14,10 +14,11 @@ #include #include #include -#include -#include +#include #include #include +#include +#include namespace NYql { using namespace NNodes; @@ -32,7 +33,8 @@ namespace NYql { }; class TGenericLoadTableMetadataTransformer: public TGraphTransformerBase { - using TMapType = std::unordered_map>; + using TMapType = + std::unordered_map>; public: TGenericLoadTableMetadataTransformer(TGenericState::TPtr state) @@ -48,42 +50,37 @@ namespace NYql { } std::unordered_set pendingTables; - const auto& reads = FindNodes(input, - [&](const TExprNode::TPtr& node) { - if (const auto maybeRead = TMaybeNode(node)) { - return maybeRead.Cast().DataSource().Category().Value() == GenericProviderName; - } - return false; - }); + const auto& reads = FindNodes(input, [&](const TExprNode::TPtr& node) { + if (const auto maybeRead = TMaybeNode(node)) { + return maybeRead.Cast().DataSource().Category().Value() == GenericProviderName; + } + return false; + }); if (!reads.empty()) { for (const auto& r : reads) { const TGenRead read(r); if (!read.FreeArgs().Get(2).Ref().IsCallable("MrTableConcat")) { - ctx.AddError( - TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), TStringBuilder() << "Expected key")); + ctx.AddError(TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), "Expected key")); return TStatus::Error; } const auto maybeKey = TExprBase(read.FreeArgs().Get(2).Ref().HeadPtr()).Maybe(); if (!maybeKey) { - ctx.AddError( - TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), TStringBuilder() << "Expected key")); + ctx.AddError(TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), "Expected key")); return TStatus::Error; } const auto& keyArg = maybeKey.Cast().Ref().Head(); if (!keyArg.IsList() || keyArg.ChildrenSize() != 2U || !keyArg.Head().IsAtom("table") || !keyArg.Tail().IsCallable(TCoString::CallableName())) { - ctx.AddError( - TIssue(ctx.GetPosition(keyArg.Pos()), TStringBuilder() << "Expected single table name")); + ctx.AddError(TIssue(ctx.GetPosition(keyArg.Pos()), "Expected single table name")); return TStatus::Error; } const auto clusterName = read.DataSource().Cluster().StringValue(); const auto tableName = TString(keyArg.Tail().Head().Content()); if (pendingTables.insert(TGenericState::TTableAddress(clusterName, tableName)).second) { - YQL_CLOG(INFO, ProviderGeneric) - << "Loading table meta for: `" << clusterName << "`.`" << tableName << "`"; + YQL_CLOG(INFO, ProviderGeneric) << "Loading table meta for: `" << clusterName << "`.`" << tableName << "`"; } } } @@ -108,6 +105,7 @@ namespace NYql { auto desc = emplaceIt.first->second; desc->DataSourceInstance = request.data_source_instance(); + Y_ENSURE(State_->GenericClient); State_->GenericClient->DescribeTable(request).Subscribe( [desc = std::move(desc), promise = std::move(promise)](const NConnector::TDescribeTableAsyncResult& f1) mutable { NConnector::TDescribeTableAsyncResult f2(f1); @@ -196,14 +194,13 @@ namespace NYql { } else { const auto& error = response.error(); NConnector::ErrorToExprCtx(error, ctx, ctx.GetPosition(read.Pos()), - TStringBuilder() - << "Loading metadata for table: " << clusterName << '.' << tableName); + TStringBuilder() << "Loading metadata for table: " << clusterName << '.' << tableName); hasErrors = true; break; } } else { - ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), - TStringBuilder() << "Not found result for " << clusterName << '.' << tableName)); + ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), TStringBuilder() + << "Not found result for " << clusterName << '.' << tableName)); hasErrors = true; break; } @@ -222,10 +219,8 @@ namespace NYql { } private: - const TStructExprType* ParseTableMeta(const NConnector::NApi::TSchema& schema, - const std::string_view& cluster, - const std::string_view& table, TExprContext& ctx, - TVector& columnOrder) try { + const TStructExprType* ParseTableMeta(const NConnector::NApi::TSchema& schema, const std::string_view& cluster, + const std::string_view& table, TExprContext& ctx, TVector& columnOrder) try { TVector items; auto columns = schema.columns(); @@ -250,20 +245,60 @@ namespace NYql { return nullptr; } - void FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig, const TString& tablePath) { + void FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig, + const TString& tablePath) { const auto dataSourceKind = clusterConfig.GetKind(); auto dsi = request.mutable_data_source_instance(); - *dsi->mutable_endpoint() = clusterConfig.GetEndpoint(); dsi->set_kind(dataSourceKind); - *dsi->mutable_credentials() = clusterConfig.GetCredentials(); dsi->set_use_tls(clusterConfig.GetUseSsl()); dsi->set_protocol(clusterConfig.GetProtocol()); + FillCredentials(request, clusterConfig); FillTypeMappingSettings(request); FillDataSourceOptions(request, clusterConfig); FillTablePath(request, clusterConfig, tablePath); } + void FillCredentials(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig) { + auto dsi = request.mutable_data_source_instance(); + + // If login/password is provided, just copy them into request + if (clusterConfig.GetCredentials().Hasbasic()) { + *dsi->mutable_credentials() = clusterConfig.GetCredentials(); + return; + } + + Y_ENSURE(State_->CredentialsFactory, "CredentialsFactory is not initialized"); + + // If service account is provided, prepare to obtain IAM-token + + auto structuredTokenJSON = TStructuredTokenBuilder().SetServiceAccountIdAuth( + clusterConfig.GetServiceAccountId(), + clusterConfig.GetServiceAccountIdSignature()) + .ToJson(); + Y_ENSURE(structuredTokenJSON, "empty structured token"); + + // Create provider or get existing one. + // It's crucial to reuse providers because their construction implies synchronous IO. + auto providersIt = State_->CredentialProviders.find(clusterConfig.name()); + if (providersIt == State_->CredentialProviders.end()) { + auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken( + State_->CredentialsFactory, + structuredTokenJSON, + false); + + providersIt = State_->CredentialProviders.emplace( + std::make_pair(clusterConfig.name(), credentialsProviderFactory->CreateProvider())) + .first; + } + + auto iamToken = providersIt->second->GetAuthInfo(); + Y_ENSURE(iamToken, "empty IAM token"); + + *dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken; + *dsi->mutable_credentials()->mutable_token()->mutable_type() = "IAM"; + } + void FillDataSourceOptions(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig) { const auto dataSourceKind = clusterConfig.GetKind(); switch (dataSourceKind) { @@ -287,13 +322,14 @@ namespace NYql { } break; default: - ythrow yexception() << "Unexpected data source kind: '" - << NYql::NConnector::NApi::EDataSourceKind_Name(dataSourceKind) << "'"; + ythrow yexception() << "Unexpected data source kind: '" << NYql::NConnector::NApi::EDataSourceKind_Name(dataSourceKind) + << "'"; } } void FillTypeMappingSettings(NConnector::NApi::TDescribeTableRequest& request) { - const TString dateTimeFormat = State_->Configuration->DateTimeFormat.Get().GetOrElse(TGenericSettings::TDefault::DateTimeFormat); + const TString dateTimeFormat = + State_->Configuration->DateTimeFormat.Get().GetOrElse(TGenericSettings::TDefault::DateTimeFormat); if (dateTimeFormat == "string") { request.mutable_type_mapping_settings()->set_date_time_format(NConnector::NApi::STRING_FORMAT); } else if (dateTimeFormat == "YQL") { @@ -303,7 +339,8 @@ namespace NYql { } } - void FillTablePath(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig, const TString& tablePath) { + void FillTablePath(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig, + const TString& tablePath) { // for backward compability full path can be used (cluster_name.`db_name.table`) // TODO: simplify during https://st.yandex-team.ru/YQ-2494 const auto dataSourceKind = clusterConfig.GetKind(); diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp index d2178ccd5b0e..c720e1b64d0e 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp @@ -6,13 +6,14 @@ namespace NYql { TDataProviderInitializer GetGenericDataProviderInitializer(NConnector::IClient::TPtr genericClient, - const std::shared_ptr dbResolver) + const IDatabaseAsyncResolver::TPtr& dbResolver, + const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) { - return [genericClient, dbResolver](const TString& userName, const TString& sessionId, const TGatewaysConfig* gatewaysConfig, - const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - TIntrusivePtr randomProvider, TIntrusivePtr typeCtx, - const TOperationProgressWriter& progressWriter, const TYqlOperationOptions& operationOptions, - THiddenQueryAborter) + return [genericClient, dbResolver, credentialsFactory](const TString& userName, const TString& sessionId, const TGatewaysConfig* gatewaysConfig, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + TIntrusivePtr randomProvider, TIntrusivePtr typeCtx, + const TOperationProgressWriter& progressWriter, const TYqlOperationOptions& operationOptions, + THiddenQueryAborter) { Y_UNUSED(sessionId); Y_UNUSED(userName); @@ -25,6 +26,7 @@ namespace NYql { typeCtx.Get(), functionRegistry, dbResolver, + credentialsFactory, genericClient, gatewaysConfig->GetGeneric()); diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider.h b/ydb/library/yql/providers/generic/provider/yql_generic_provider.h index d990b2084bb4..5c8e4c967a8c 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_provider.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider.h @@ -2,14 +2,14 @@ #include "yql_generic_state.h" -#include #include #include namespace NYql { TDataProviderInitializer GetGenericDataProviderInitializer( - NConnector::IClient::TPtr genericClient, // required - std::shared_ptr dbResolver = nullptr // can be missing in on-prem installations + NConnector::IClient::TPtr genericClient, // required + const IDatabaseAsyncResolver::TPtr& dbResolver = nullptr, // can be missing in on-prem installations + const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory = nullptr // can be missing in on-prem installations ); TIntrusivePtr CreateGenericDataSource(TGenericState::TPtr state); diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp index 0b4c93a8bf4d..1c2521573ddb 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp @@ -1,5 +1,6 @@ #include "yql_generic_cluster_config.h" #include "yql_generic_settings.h" +#include "yql_generic_utils.h" #include #include @@ -34,7 +35,7 @@ namespace NYql { const TCredentials::TPtr& credentials) { ValidateGenericClusterConfig(clusterConfig, "TGenericConfiguration::AddCluster"); - YQL_CLOG(INFO, ProviderGeneric) << "generic provider add cluster: " << DumpGenericClusterConfig(clusterConfig); + YQL_CLOG(INFO, ProviderGeneric) << "GenericConfiguration::AddCluster: " << DumpGenericClusterConfig(clusterConfig); const auto& clusterName = clusterConfig.GetName(); const auto& databaseId = clusterConfig.GetDatabaseId(); @@ -95,23 +96,6 @@ namespace NYql { "or set (ServiceAccountId && ServiceAccountIdSignature) in cluster config"; } - TString TGenericConfiguration::DumpGenericClusterConfig(const TGenericClusterConfig& clusterConfig) const { - TStringBuilder sb; - sb << "name = " << clusterConfig.GetName() - << ", kind = " << NConnector::NApi::EDataSourceKind_Name(clusterConfig.GetKind()) - << ", database name = " << clusterConfig.GetDatabaseName() - << ", database id = " << clusterConfig.GetName() - << ", endpoint = " << clusterConfig.GetEndpoint() - << ", use tls = " << clusterConfig.GetUseSsl() - << ", protocol = " << NConnector::NApi::EProtocol_Name(clusterConfig.GetProtocol()); - - for (const auto& [key, value] : clusterConfig.GetDataSourceOptions()) { - sb << ", " << key << " = " << value; - } - - return sb; - } - TGenericSettings::TConstPtr TGenericConfiguration::Snapshot() const { return std::make_shared(*this); } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h index d783963a6589..07a19c5ce827 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h @@ -25,26 +25,21 @@ namespace NYql { TGenericConfiguration(); TGenericConfiguration(const TGenericConfiguration&) = delete; - void Init(const NYql::TGenericGatewayConfig& gatewayConfig, - const std::shared_ptr databaseResolver, - NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth, - const TCredentials::TPtr& credentials); + void Init(const NYql::TGenericGatewayConfig& gatewayConfig, const std::shared_ptr databaseResolver, + NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth, const TCredentials::TPtr& credentials); - void AddCluster(const TGenericClusterConfig& clusterConfig, - const std::shared_ptr databaseResolver, - NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth, - const TCredentials::TPtr& credentials); + void AddCluster(const TGenericClusterConfig& clusterConfig, const std::shared_ptr databaseResolver, + NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth, const TCredentials::TPtr& credentials); TGenericSettings::TConstPtr Snapshot() const; bool HasCluster(TStringBuf cluster) const; private: TString MakeStructuredToken(const TGenericClusterConfig& clusterConfig, const TCredentials::TPtr& credentials) const; - TString DumpGenericClusterConfig(const TGenericClusterConfig& clusterConfig) const; public: THashMap Tokens; THashMap ClusterNamesToClusterConfigs; // cluster name -> cluster config THashMap> DatabaseIdsToClusterNames; // database id -> cluster name }; -} +} //namespace NYql diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_state.h b/ydb/library/yql/providers/generic/provider/yql_generic_state.h index e2362bc5ad27..3d69efdfe0d3 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_state.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_state.h @@ -3,7 +3,9 @@ #include "yql_generic_settings.h" #include +#include #include +#include namespace NKikimr::NMiniKQL { class IFunctionRegistry; @@ -29,13 +31,15 @@ namespace NYql { TGenericState( TTypeAnnotationContext* types, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - const std::shared_ptr& databaseResolver, + const std::shared_ptr& databaseResolver, + const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory, const NConnector::IClient::TPtr& genericClient, const TGenericGatewayConfig& gatewayConfig) : Types(types) , Configuration(MakeIntrusive()) , FunctionRegistry(functionRegistry) , DatabaseResolver(databaseResolver) + , CredentialsFactory(credentialsFactory) , GenericClient(genericClient) { Configuration->Init(gatewayConfig, databaseResolver, DatabaseAuth, types->Credentials); @@ -49,9 +53,15 @@ namespace NYql { TGenericConfiguration::TPtr Configuration = MakeIntrusive(); const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry; - // key - (database id, database type), value - credentials to access MDB API - NYql::IDatabaseAsyncResolver::TDatabaseAuthMap DatabaseAuth; - std::shared_ptr DatabaseResolver; + // key - (database id, database type), value - credentials to access managed APIs + IDatabaseAsyncResolver::TDatabaseAuthMap DatabaseAuth; + std::shared_ptr DatabaseResolver; + + // key - cluster name, value - TCredentialsProviderPtr + // It's important to cache credentials providers, because they make IO + // (synchronous call via Token Accessor client) during the construction. + std::unordered_map CredentialProviders; + ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; NConnector::IClient::TPtr GenericClient; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_utils.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_utils.cpp new file mode 100644 index 000000000000..aba0b51924b3 --- /dev/null +++ b/ydb/library/yql/providers/generic/provider/yql_generic_utils.cpp @@ -0,0 +1,22 @@ +#include "yql_generic_utils.h" + +#include + +namespace NYql { + TString DumpGenericClusterConfig(const TGenericClusterConfig& clusterConfig) { + TStringBuilder sb; + sb << "name = " << clusterConfig.GetName() + << ", kind = " << NConnector::NApi::EDataSourceKind_Name(clusterConfig.GetKind()) + << ", database name = " << clusterConfig.GetDatabaseName() + << ", database id = " << clusterConfig.GetDatabaseId() + << ", endpoint = " << clusterConfig.GetEndpoint() + << ", use tls = " << clusterConfig.GetUseSsl() + << ", protocol = " << NConnector::NApi::EProtocol_Name(clusterConfig.GetProtocol()); + + for (const auto& [key, value] : clusterConfig.GetDataSourceOptions()) { + sb << ", " << key << " = " << value; + } + + return sb; + } +} diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_utils.h b/ydb/library/yql/providers/generic/provider/yql_generic_utils.h new file mode 100644 index 000000000000..49c6bab7abca --- /dev/null +++ b/ydb/library/yql/providers/generic/provider/yql_generic_utils.h @@ -0,0 +1,8 @@ +#pragma once + +#include +#include + +namespace NYql { + TString DumpGenericClusterConfig(const TGenericClusterConfig& clusterConfig); +} diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp index 7d71897cc07e..1ea9cb995e76 100644 --- a/ydb/library/yql/tools/dqrun/dqrun.cpp +++ b/ydb/library/yql/tools/dqrun/dqrun.cpp @@ -46,6 +46,7 @@ #include #include #include +#include #include #include #include @@ -92,6 +93,7 @@ #include #include #include +#include #include #ifdef PROFILE_MEMORY_ALLOCATIONS @@ -223,14 +225,20 @@ class TOptPipelineConfigurator : public IPipelineConfigurator { IOutputStream* TracePlan; }; -NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(const NYdb::TDriver& driver, IHTTPGateway::TPtr httpGateway, NYql::NConnector::IClient::TPtr genericClient, size_t HTTPmaxTimeSeconds, size_t maxRetriesCount) { +NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory( + const NYdb::TDriver& driver, + IHTTPGateway::TPtr httpGateway, + NYql::NConnector::IClient::TPtr genericClient, + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + size_t HTTPmaxTimeSeconds, + size_t maxRetriesCount) { auto factory = MakeIntrusive(); RegisterDqPqReadActorFactory(*factory, driver, nullptr); RegisterYdbReadActorFactory(*factory, driver, nullptr); RegisterS3ReadActorFactory(*factory, nullptr, httpGateway, GetHTTPDefaultRetryPolicy(TDuration::Seconds(HTTPmaxTimeSeconds), maxRetriesCount), {}, nullptr); RegisterS3WriteActorFactory(*factory, nullptr, httpGateway); RegisterClickHouseReadActorFactory(*factory, nullptr, httpGateway); - RegisterGenericReadActorFactory(*factory, nullptr, genericClient); + RegisterGenericReadActorFactory(*factory, credentialsFactory, genericClient); RegisterDqPqWriteActorFactory(*factory, driver, nullptr); @@ -409,6 +417,7 @@ int RunMain(int argc, const char* argv[]) TString mountConfig; TString mestricsPusherConfig; TString udfResolver; + TString tokenAccessorEndpoint; bool udfResolverFilterSyscalls = false; TString statFile; TString metricsFile; @@ -559,6 +568,10 @@ int RunMain(int argc, const char* argv[]) failureInjections[key] = std::make_pair(ui32(0), FromString(fail)); } }); + opts.AddLongOption("token-accessor-endpoint", "Network address of Token Accessor service in format grpc(s)://host:port") + .Optional() + .RequiredArgument("ENDPOINT") + .StoreResult(&tokenAccessorEndpoint); opts.AddHelpOption('h'); opts.SetFreeArgsNum(0); @@ -743,6 +756,16 @@ int RunMain(int argc, const char* argv[]) ); } + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory; + + if (tokenAccessorEndpoint) { + TVector ss = StringSplitter(tokenAccessorEndpoint).SplitByString("://"); + YQL_ENSURE(ss.size() == 2, "Invalid tokenAccessorEndpoint: " << tokenAccessorEndpoint); + + credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(ss[1], ss[0] == "grpcs", ""); + } + + NConnector::IClient::TPtr genericClient; if (gatewaysConfig.HasGeneric()) { for (auto& cluster : *gatewaysConfig.MutableGeneric()->MutableClusterMapping()) { @@ -750,7 +773,8 @@ int RunMain(int argc, const char* argv[]) } genericClient = NConnector::MakeClientGRPC(gatewaysConfig.GetGeneric().GetConnector()); - dataProvidersInit.push_back(GetGenericDataProviderInitializer(genericClient, dbResolver)); + + dataProvidersInit.push_back(GetGenericDataProviderInitializer(genericClient, dbResolver, credentialsFactory)); } if (gatewaysConfig.HasYdb()) { @@ -818,11 +842,10 @@ int RunMain(int argc, const char* argv[]) size_t requestTimeout = gatewaysConfig.HasHttpGateway() && gatewaysConfig.GetHttpGateway().HasRequestTimeoutSeconds() ? gatewaysConfig.GetHttpGateway().GetRequestTimeoutSeconds() : 100; size_t maxRetries = gatewaysConfig.HasHttpGateway() && gatewaysConfig.GetHttpGateway().HasMaxRetries() ? gatewaysConfig.GetHttpGateway().GetMaxRetries() : 2; - const bool enableSpilling = res.Has("enable-spilling"); - dqGateway = CreateLocalDqGateway(funcRegistry.Get(), dqCompFactory, dqTaskTransformFactory, dqTaskPreprocessorFactories, - enableSpilling, CreateAsyncIoFactory(driver, httpGateway, genericClient, requestTimeout, maxRetries), threads, - metricsRegistry, - metricsPusherFactory); + bool enableSpilling = res.Has("enable-spilling"); + dqGateway = CreateLocalDqGateway(funcRegistry.Get(), dqCompFactory, dqTaskTransformFactory, dqTaskPreprocessorFactories, enableSpilling, + CreateAsyncIoFactory(driver, httpGateway, genericClient, credentialsFactory, requestTimeout, maxRetries), threads, + metricsRegistry, metricsPusherFactory); } gateways.emplace_back(dqGateway); diff --git a/ydb/library/yql/tools/dqrun/ya.make b/ydb/library/yql/tools/dqrun/ya.make index 5643a263eacf..c47f3877d79a 100644 --- a/ydb/library/yql/tools/dqrun/ya.make +++ b/ydb/library/yql/tools/dqrun/ya.make @@ -42,6 +42,7 @@ ENDIF() ydb/library/yql/providers/clickhouse/provider ydb/library/yql/providers/common/comp_nodes ydb/library/yql/providers/common/proto + ydb/library/yql/providers/common/token_accessor/client ydb/library/yql/providers/common/udf_resolve ydb/library/yql/providers/generic/actors ydb/library/yql/providers/generic/provider diff --git a/ydb/services/fq/ut_integration/fq_ut.cpp b/ydb/services/fq/ut_integration/fq_ut.cpp index 1b5764fee4cf..c011c80138d7 100644 --- a/ydb/services/fq/ut_integration/fq_ut.cpp +++ b/ydb/services/fq/ut_integration/fq_ut.cpp @@ -197,32 +197,6 @@ Y_UNIT_TEST_SUITE(Yq_1) { } } - Y_UNIT_TEST(Basic_EmptyTable) { - TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); - UpsertToExistingTable(driver, location); - NYdb::NFq::TClient client(driver); - const TString folderId = "some_folder_id"; - { - const auto request = ::NFq::TCreateConnectionBuilder() - .SetName("testdbempty") - .CreateYdb("Root", location, "") - .Build(); - const auto result = client - .CreateConnection(request, CreateFqSettings(folderId)) - .ExtractValueSync(); - UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); - } - - const TString queryId = CreateNewHistoryAndWaitFinish( - folderId, client, - "select count(*) from testdbempty.`yq/empty_table`", - FederatedQuery::QueryMeta::COMPLETED); - CheckGetResultData(client, queryId, folderId, 1, 1, 0); - } - Y_UNIT_TEST(Basic_EmptyList) { TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); ui16 grpc = server.GetPort(); @@ -256,32 +230,6 @@ Y_UNIT_TEST_SUITE(Yq_1) { CreateNewHistoryAndWaitFinish(folderId, client, "select null", expectedStatus); } - SIMPLE_UNIT_FORKED_TEST(Basic_Tagged) { - TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); - NYdb::NFq::TClient client(driver); - const TString folderId = "some_folder_id"; - - - { - auto request = ::NFq::TCreateConnectionBuilder{} - .SetName("testdb00") - .CreateYdb("Root", location, "") - .Build(); - - auto result = client.CreateConnection( - request, CreateFqSettings(folderId)) - .ExtractValueSync(); - - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - } - - auto expectedStatus = FederatedQuery::QueryMeta::COMPLETED; - CreateNewHistoryAndWaitFinish(folderId, client, "select AsTagged(count(*), \"tag\") from testdb00.`yq/connections`", expectedStatus); - } - Y_UNIT_TEST(Basic_TaggedLiteral) { TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); ui16 grpc = server.GetPort(); @@ -295,50 +243,6 @@ Y_UNIT_TEST_SUITE(Yq_1) { } // use fork for data test due to ch initialization problem - SIMPLE_UNIT_FORKED_TEST(ExtendedDatabaseId) { - TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); - - NYdb::NFq::TClient client(driver); - const TString folderId = "folder_id_" + CreateGuidAsString(); - { - const auto request = ::NFq::TCreateConnectionBuilder() - .SetName("testdb01") - .CreateYdb("FakeDatabaseId", "") - .Build(); - const auto result = client - .CreateConnection(request, CreateFqSettings(folderId)) - .ExtractValueSync(); - UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); - } - - { - const auto request = ::NFq::TCreateConnectionBuilder() - .SetName("testdb02") - .CreateYdb("FakeDatabaseId", "") - .Build(); - const auto result = client - .CreateConnection(request, CreateFqSettings(folderId)) - .ExtractValueSync(); - UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); - } - - { - const auto queryId = CreateNewHistoryAndWaitFinish(folderId, client, - "select count(*) from testdb01.`yq/connections`", FederatedQuery::QueryMeta::COMPLETED); - CheckGetResultData(client, queryId, folderId, 1, 1, 2); - } - - { - // test connections db with 2 databaseId - const auto queryId = CreateNewHistoryAndWaitFinish(folderId, client, - "select count(*) from testdb02.`yq/connections`", FederatedQuery::QueryMeta::COMPLETED); - CheckGetResultData(client, queryId, folderId, 1, 1, 2); - } - } - Y_UNIT_TEST(DescribeConnection) { TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); ui16 grpc = server.GetPort(); @@ -855,70 +759,6 @@ Y_UNIT_TEST_SUITE(Yq_1) { } } -Y_UNIT_TEST_SUITE(Yq_2) { - SIMPLE_UNIT_FORKED_TEST(ReadFromYdbOverYq) { - TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); - NYdb::NFq::TClient client(driver); - const auto folderId = TString(__func__) + "folder_id"; - - { - auto request = ::NFq::TCreateConnectionBuilder{} - .SetName("testdb00") - .CreateYdb("Root", location, "") - .Build(); - - auto result = client.CreateConnection( - request, CreateFqSettings(folderId)) - .ExtractValueSync(); - - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - } - - TString queryId; - { - auto request = ::NFq::TCreateQueryBuilder{} - .SetText("select count(*) from testdb00.`yq/connections`") - .Build(); - auto result = client.CreateQuery( - request, CreateFqSettings(folderId)) - .ExtractValueSync(); - - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - queryId = result.GetResult().query_id(); - } - - { - auto request = ::NFq::TDescribeQueryBuilder{}.SetQueryId(queryId).Build(); - auto result = DoWithRetryOnRetCode([&]() { - auto result = client.DescribeQuery( - request, CreateFqSettings(folderId)) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - const auto status = result.GetResult().query().meta().status(); - PrintProtoIssues(result.GetResult().query().issue()); - return status == FederatedQuery::QueryMeta::COMPLETED; - }, TRetryOptions(10)); - UNIT_ASSERT_C(result, "the execution of the query did not end within the time limit"); - } - - { - auto request = ::NFq::TGetResultDataBuilder{}.SetQueryId(queryId).Build(); - auto result = client.GetResultData( - request, CreateFqSettings(folderId)) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - - const auto& resultSet = result.GetResult().result_set(); - UNIT_ASSERT_VALUES_EQUAL(resultSet.rows().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(resultSet.columns().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(resultSet.rows(0).items(0).uint64_value(), 1); - } - } -} - Y_UNIT_TEST_SUITE(PrivateApi) { Y_UNIT_TEST(PingTask) { TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); From 8f97d79508b8cb35f4a5ad3a9454ff32d07f2d61 Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Mon, 4 Mar 2024 19:45:05 +0300 Subject: [PATCH 5/7] New methods to provide IAM-tokens when accessing managed YDB from `dqrun`. (#2386) --- .../common/proto/gateways_config.proto | 11 ++- .../yql/providers/generic/actors/ya.make | 1 + .../generic/actors/yql_generic_read_actor.cpp | 55 ++++----------- .../generic/actors/yql_generic_read_actor.h | 2 +- .../actors/yql_generic_source_factory.cpp | 4 +- .../actors/yql_generic_token_provider.cpp | 67 +++++++++++++++++++ .../actors/yql_generic_token_provider.h | 30 +++++++++ .../yql/providers/generic/proto/source.proto | 11 +-- .../provider/ut/pushdown/pushdown_ut.cpp | 8 +-- .../provider/yql_generic_dq_integration.cpp | 9 ++- .../provider/yql_generic_load_meta.cpp | 22 ++++-- 11 files changed, 154 insertions(+), 66 deletions(-) create mode 100644 ydb/library/yql/providers/generic/actors/yql_generic_token_provider.cpp create mode 100644 ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index 6659e421284b..865d42bc7bf8 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -563,12 +563,11 @@ message TGenericClusterConfig { // Credentials used to access data source instance optional NYql.NConnector.NApi.TCredentials Credentials = 10; - // Credentials used to access MDB API. - // When working with data source instances deployed in a cloud, - // you should either set (ServiceAccountId, ServiceAccountIdSignature) pair, - // or set IAM Token. - // The names of these fields must satisfy this template function: - // https://github.com/ydb-platform/ydb/arcadia/contrib/ydb/core/fq/libs/actors/clusters_from_connections.cpp?rev=r11823087#L19 + // Credentials used to access managed databases APIs. + // When working with external data source instances deployed in clouds, + // one should either set (ServiceAccountId, ServiceAccountIdSignature) pair + // that will be resolved into IAM Token via Token Accessor, + // or provide IAM Token directly. optional string ServiceAccountId = 6; optional string ServiceAccountIdSignature = 7; optional string Token = 11; diff --git a/ydb/library/yql/providers/generic/actors/ya.make b/ydb/library/yql/providers/generic/actors/ya.make index 40471d122e07..53f40afdca7c 100644 --- a/ydb/library/yql/providers/generic/actors/ya.make +++ b/ydb/library/yql/providers/generic/actors/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( yql_generic_read_actor.cpp yql_generic_source_factory.cpp + yql_generic_token_provider.cpp ) PEERDIR( diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp index 2efba5888531..51c02bb40456 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp @@ -1,4 +1,5 @@ #include "yql_generic_read_actor.h" +#include "yql_generic_token_provider.h" #include #include @@ -9,11 +10,9 @@ #include #include #include -#include #include #include #include -#include #include #include #include @@ -104,14 +103,14 @@ namespace NYql::NDq { ui64 inputIndex, TCollectStatsLevel statsLevel, NConnector::IClient::TPtr client, - NYdb::TCredentialsProviderPtr credentialsProvider, - NConnector::TSource&& source, + TGenericTokenProvider::TPtr tokenProvider, + Generic::TSource&& source, const NActors::TActorId& computeActorId, const NKikimr::NMiniKQL::THolderFactory& holderFactory) : InputIndex_(inputIndex) , ComputeActorId_(computeActorId) , Client_(std::move(client)) - , CredentialsProvider_(std::move(credentialsProvider)) + , TokenProvider_(std::move(tokenProvider)) , HolderFactory_(holderFactory) , Source_(source) { @@ -146,7 +145,7 @@ namespace NYql::NDq { // Prepare request NConnector::NApi::TListSplitsRequest request; NConnector::NApi::TSelect select = Source_.select(); // copy TSelect from source - MaybeRefreshToken(select.mutable_data_source_instance()); + TokenProvider_->MaybeFillToken(*select.mutable_data_source_instance()); *request.mutable_selects()->Add() = std::move(select); // Initialize stream @@ -242,7 +241,7 @@ namespace NYql::NDq { Splits_.cbegin(), Splits_.cend(), [&](const NConnector::NApi::TSplit& split) { NConnector::NApi::TSplit splitCopy = split; - MaybeRefreshToken(splitCopy.mutable_select()->mutable_data_source_instance()); + TokenProvider_->MaybeFillToken(*splitCopy.mutable_select()->mutable_data_source_instance()); *request.mutable_splits()->Add() = std::move(split); }); @@ -459,20 +458,6 @@ namespace NYql::NDq { return total; } - void MaybeRefreshToken(NConnector::NApi::TDataSourceInstance* dsi) const { - if (!dsi->credentials().has_token()) { - return; - } - - // Token may have expired. Refresh it. - Y_ENSURE(CredentialsProvider_, "CredentialsProvider is not initialized"); - auto iamToken = CredentialsProvider_->GetAuthInfo(); - Y_ENSURE(iamToken, "empty IAM token"); - - *dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken; - *dsi->mutable_credentials()->mutable_token()->mutable_type() = "IAM"; - } - // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor YQL_CLOG(INFO, ProviderGeneric) << "PassAway :: final ingress stats" @@ -505,7 +490,7 @@ namespace NYql::NDq { const NActors::TActorId ComputeActorId_; NConnector::IClient::TPtr Client_; - NYdb::TCredentialsProviderPtr CredentialsProvider_; + TGenericTokenProvider::TPtr TokenProvider_; NConnector::IListSplitsStreamIterator::TPtr ListSplitsIterator_; TVector Splits_; // accumulated list of table splits NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator_; @@ -514,12 +499,12 @@ namespace NYql::NDq { NKikimr::NMiniKQL::TPlainContainerCache ArrowRowContainerCache_; const NKikimr::NMiniKQL::THolderFactory& HolderFactory_; - NConnector::TSource Source_; + Generic::TSource Source_; }; std::pair CreateGenericReadActor(NConnector::IClient::TPtr genericClient, - NConnector::TSource&& source, + Generic::TSource&& source, ui64 inputIndex, TCollectStatsLevel statsLevel, const THashMap& /*secureParams*/, @@ -548,24 +533,6 @@ namespace NYql::NDq { */ // Obtain token to access remote data source if necessary - NYdb::TCredentialsProviderPtr credentialProvider; - if (source.GetServiceAccountId() && source.GetServiceAccountIdSignature()) { - Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized"); - - auto structuredTokenJSON = TStructuredTokenBuilder().SetServiceAccountIdAuth( - source.GetServiceAccountId(), source.GetServiceAccountIdSignature()) - .ToJson(); - - // If service account is provided, obtain IAM-token - Y_ENSURE(structuredTokenJSON, "empty structured token"); - - auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken( - credentialsFactory, - structuredTokenJSON, - false); - credentialProvider = credentialsProviderFactory->CreateProvider(); - } - // TODO: partitioning is not implemented now, but this code will be useful for the further research: /* TStringBuilder part; @@ -579,11 +546,13 @@ namespace NYql::NDq { part << ';'; */ + auto tokenProvider = CreateGenericTokenProvider(source, credentialsFactory); + const auto actor = new TGenericReadActor( inputIndex, statsLevel, genericClient, - std::move(credentialProvider), + std::move(tokenProvider), std::move(source), computeActorId, holderFactory); diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.h b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.h index 6f8b81bb3063..1bdb050dcd72 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.h +++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.h @@ -9,7 +9,7 @@ namespace NYql::NDq { std::pair - CreateGenericReadActor(NConnector::IClient::TPtr genericClient, NConnector::TSource&& params, ui64 inputIndex, + CreateGenericReadActor(NConnector::IClient::TPtr genericClient, Generic::TSource&& params, ui64 inputIndex, TCollectStatsLevel statsLevel, const THashMap& secureParams, const THashMap& taskParams, const NActors::TActorId& computeActorId, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp index 3b035bcb5378..3f0cce4a6dc9 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp @@ -10,14 +10,14 @@ namespace NYql::NDq { ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NYql::NConnector::IClient::TPtr genericClient) { auto genericFactory = [credentialsFactory, genericClient]( - NConnector::TSource&& settings, + Generic::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { return CreateGenericReadActor(genericClient, std::move(settings), args.InputIndex, args.StatsLevel, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, args.HolderFactory); }; for (auto& sourceName : {"ClickHouseGeneric", "PostgreSqlGeneric", "YdbGeneric"}) { - factory.RegisterSource(sourceName, genericFactory); + factory.RegisterSource(sourceName, genericFactory); } } diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.cpp new file mode 100644 index 000000000000..e8430b87e9ec --- /dev/null +++ b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.cpp @@ -0,0 +1,67 @@ +#include "yql_generic_token_provider.h" + +#include + +namespace NYql::NDq { + TGenericTokenProvider::TGenericTokenProvider( + const NYql::Generic::TSource& source, const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) + : Source_(source) + , StaticIAMToken_(source.GetToken()) + , CredentialsProvider_(nullptr) + { + // 1. User has provided IAM-token itself. + // This token will be used during the whole lifetime of a read actor. + if (!StaticIAMToken_.empty()) { + return; + } + + // 2. User has provided service account creds. + // We create token accessor client that will renew token accessor by demand. + if (source.GetServiceAccountId() && source.GetServiceAccountIdSignature()) { + Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized"); + + auto structuredTokenJSON = + TStructuredTokenBuilder() + .SetServiceAccountIdAuth(source.GetServiceAccountId(), source.GetServiceAccountIdSignature()) + .ToJson(); + + // If service account is provided, obtain IAM-token + Y_ENSURE(structuredTokenJSON, "empty structured token"); + + auto credentialsProviderFactory = + CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, structuredTokenJSON, false); + CredentialsProvider_ = credentialsProviderFactory->CreateProvider(); + } + + // 3. If we reached this point, it means that user doesn't need token auth. + } + + void TGenericTokenProvider::MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const { + // 1. Don't need tokens if basic auth is set + if (dsi.credentials().has_basic()) { + return; + } + + *dsi.mutable_credentials()->mutable_token()->mutable_type() = "IAM"; + + // 2. If static IAM-token has been provided, use it + if (!StaticIAMToken_.empty()) { + *dsi.mutable_credentials()->mutable_token()->mutable_value() = StaticIAMToken_; + return; + } + + // 3. Otherwise use credentials provider to get token + Y_ENSURE(CredentialsProvider_, "CredentialsProvider is not initialized"); + + auto iamToken = CredentialsProvider_->GetAuthInfo(); + Y_ENSURE(iamToken, "CredentialsProvider returned empty IAM token"); + + *dsi.mutable_credentials()->mutable_token()->mutable_value() = std::move(iamToken); + } + + TGenericTokenProvider::TPtr + CreateGenericTokenProvider(const NYql::Generic::TSource& source, + const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) { + return std::make_unique(source, credentialsFactory); + } +} //namespace NYql::NDq diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h new file mode 100644 index 000000000000..495a44c15e57 --- /dev/null +++ b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h @@ -0,0 +1,30 @@ +#pragma once + +#include +#include +#include + +namespace NYql::NDq { + // When accessing external data sources using authentication via tokens, + // there are two options: + // 1. Use static IAM-token provided by user (especially useful during debugging); + // 2. Use service account credentials in order to get (and refresh) IAM-token by demand. + class TGenericTokenProvider { + public: + using TPtr = std::unique_ptr; + + TGenericTokenProvider(const NYql::Generic::TSource& source, + const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory); + + void MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const; + + private: + NYql::Generic::TSource Source_; + TString StaticIAMToken_; + NYdb::TCredentialsProviderPtr CredentialsProvider_; + }; + + TGenericTokenProvider::TPtr + CreateGenericTokenProvider(const NYql::Generic::TSource& source, + const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory); +} //namespace NYql::NDq diff --git a/ydb/library/yql/providers/generic/proto/source.proto b/ydb/library/yql/providers/generic/proto/source.proto index 2ea009080872..060593a99c15 100644 --- a/ydb/library/yql/providers/generic/proto/source.proto +++ b/ydb/library/yql/providers/generic/proto/source.proto @@ -2,7 +2,7 @@ syntax = "proto3"; option cc_enable_arenas = true; -package NYql.NConnector; +package NYql.Generic; import "ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto"; import "ydb/library/yql/providers/generic/connector/api/common/data_source.proto"; @@ -11,11 +11,14 @@ message TSource { // Prepared Select expression NYql.NConnector.NApi.TSelect select = 2; - // ServiceAccountId and ServiceAccountIdSignature are used to obtain tokens - // to access external data source supporting this kind of authentication - // during the runtime phase. + // Credentials used to access managed databases APIs. + // When working with external data source instances deployed in clouds, + // one should either set (ServiceAccountId, ServiceAccountIdSignature) pair + // that will be resolved into IAM Token via Token Accessor, + // or provide IAM Token directly. string ServiceAccountId = 4; string ServiceAccountIdSignature = 5; + string Token = 6; reserved 1, 3; } diff --git a/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp b/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp index f632e1f2627c..48bb17d52670 100644 --- a/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp +++ b/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp @@ -144,7 +144,7 @@ struct TFakeGenericClient: public NConnector::IClient { class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase { public: - explicit TBuildDqSourceSettingsTransformer(TTypeAnnotationContext* types, NConnector::TSource* dqSourceSettings, bool* dqSourceSettingsWereBuilt) + explicit TBuildDqSourceSettingsTransformer(TTypeAnnotationContext* types, Generic::TSource* dqSourceSettings, bool* dqSourceSettingsWereBuilt) : TOptimizeTransformerBase(types, NLog::EComponent::ProviderGeneric, {}) , DqSourceSettings_(dqSourceSettings) , DqSourceSettingsWereBuilt_(dqSourceSettingsWereBuilt) @@ -182,13 +182,13 @@ class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase { TString sourceType; dqIntegration->FillSourceSettings(*dqSourceNode, settings, sourceType, 1); UNIT_ASSERT_STRINGS_EQUAL(sourceType, "PostgreSqlGeneric"); - UNIT_ASSERT(settings.Is()); + UNIT_ASSERT(settings.Is()); settings.UnpackTo(DqSourceSettings_); *DqSourceSettingsWereBuilt_ = true; } private: - NConnector::TSource* DqSourceSettings_; + Generic::TSource* DqSourceSettings_; bool* DqSourceSettingsWereBuilt_; }; @@ -207,7 +207,7 @@ struct TPushdownFixture: public NUnitTest::TBaseFixture { TAutoPtr Transformer; TAutoPtr BuildDqSourceSettingsTransformer; - NConnector::TSource DqSourceSettings; + Generic::TSource DqSourceSettings; bool DqSourceSettingsWereBuilt = false; TExprNode::TPtr InitialExprRoot; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp index 8bee8f7eb75f..74a6bd819177 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp @@ -102,7 +102,7 @@ namespace NYql { const auto& clusterConfig = State_->Configuration->ClusterNamesToClusterConfigs[clusterName]; const auto& endpoint = clusterConfig.endpoint(); - NConnector::TSource source; + Generic::TSource source; // for backward compability full path can be used (cluster_name.`db_name.table`) // TODO: simplify during https://st.yandex-team.ru/YQ-2494 @@ -149,10 +149,15 @@ namespace NYql { } // Managed YDB supports access via IAM token. - // Copy service account ids to obtain tokens during request execution phase. + // If exist, copy service account creds to obtain tokens during request execution phase. + // If exists, copy previously created token. if (clusterConfig.kind() == NConnector::NApi::EDataSourceKind::YDB) { source.SetServiceAccountId(clusterConfig.GetServiceAccountId()); source.SetServiceAccountIdSignature(clusterConfig.GetServiceAccountIdSignature()); + source.SetToken(State_->Types->Credentials->FindCredentialContent( + "default_" + clusterConfig.name(), + "default_generic", + clusterConfig.GetToken())); } // preserve source description for read actor diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp index 329bd634de3b..383e342c1523 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp @@ -262,20 +262,34 @@ namespace NYql { void FillCredentials(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig) { auto dsi = request.mutable_data_source_instance(); - // If login/password is provided, just copy them into request + // If login/password is provided, just copy them into request: + // connector will use Basic Auth to access external data sources. if (clusterConfig.GetCredentials().Hasbasic()) { *dsi->mutable_credentials() = clusterConfig.GetCredentials(); return; } - Y_ENSURE(State_->CredentialsFactory, "CredentialsFactory is not initialized"); + // If there are no Basic Auth parameters, two options can be considered: + + // 1. Client provided own IAM-token to access external data source + auto iamToken = State_->Types->Credentials->FindCredentialContent( + "default_" + clusterConfig.name(), + "default_generic", + clusterConfig.GetToken()); + if (iamToken) { + *dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken; + *dsi->mutable_credentials()->mutable_token()->mutable_type() = "IAM"; + return; + } - // If service account is provided, prepare to obtain IAM-token + // 2. Client provided service account creds that must be converted into IAM-token + Y_ENSURE(State_->CredentialsFactory, "CredentialsFactory is not initialized"); auto structuredTokenJSON = TStructuredTokenBuilder().SetServiceAccountIdAuth( clusterConfig.GetServiceAccountId(), clusterConfig.GetServiceAccountIdSignature()) .ToJson(); + Y_ENSURE(structuredTokenJSON, "empty structured token"); // Create provider or get existing one. @@ -292,7 +306,7 @@ namespace NYql { .first; } - auto iamToken = providersIt->second->GetAuthInfo(); + iamToken = providersIt->second->GetAuthInfo(); Y_ENSURE(iamToken, "empty IAM token"); *dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken; From fc3ce13c14ff416f7eb40a235103b0acf5b15542 Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Tue, 12 Mar 2024 22:09:08 +0300 Subject: [PATCH 6/7] Support Managed YDB as an external data source for YDB (#2662) --- ydb/core/fq/libs/compute/common/config.h | 2 +- .../actors/query_utils.cpp | 8 ++ .../ydb_control_plane_storage_queries.cpp | 2 +- .../kqp_federated_query_helpers.cpp | 7 +- .../external_data_source/manager.cpp | 9 +- ydb/core/kqp/host/kqp_host.cpp | 2 +- .../generic_ut/kqp_generic_provider_ut.cpp | 48 ++++----- ydb/core/testlib/test_client.cpp | 4 +- .../common/proto/gateways_config.proto | 10 +- .../connector/libcpp/ut_helpers/defaults.cpp | 1 - .../connector/libcpp/ut_helpers/defaults.h | 1 - .../provider/yql_generic_cluster_config.cpp | 99 ++++++++++++------- 12 files changed, 116 insertions(+), 77 deletions(-) diff --git a/ydb/core/fq/libs/compute/common/config.h b/ydb/core/fq/libs/compute/common/config.h index 6acf4cc895e7..c0aa41ce441f 100644 --- a/ydb/core/fq/libs/compute/common/config.h +++ b/ydb/core/fq/libs/compute/common/config.h @@ -164,8 +164,8 @@ class TComputeConfig { case FederatedQuery::ConnectionSetting::kObjectStorage: case FederatedQuery::ConnectionSetting::kClickhouseCluster: case FederatedQuery::ConnectionSetting::kPostgresqlCluster: - return true; case FederatedQuery::ConnectionSetting::kYdbDatabase: + return true; case FederatedQuery::ConnectionSetting::kDataStreams: case FederatedQuery::ConnectionSetting::kMonitoring: case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET: diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp index 61f9bd21cf7e..f8fc94ca2c91 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp @@ -178,6 +178,14 @@ TString MakeCreateExternalDataSourceQuery( switch (connectionContent.setting().connection_case()) { case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET: case FederatedQuery::ConnectionSetting::kYdbDatabase: + properties = fmt::format( + R"( + SOURCE_TYPE="Ydb", + DATABASE_ID={database_id}, + USE_TLS="{use_tls}" + )", + "database_id"_a = EncloseAndEscapeString(connectionContent.setting().ydb_database().database_id(), '"'), + "use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true"); break; case FederatedQuery::ConnectionSetting::kClickhouseCluster: properties = fmt::format( diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index 7ab56126f135..08dae4bbf5ed 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -28,7 +28,7 @@ FederatedQuery::IamAuth::IdentityCase GetIamAuth(const FederatedQuery::Connectio const auto& setting = connection.content().setting(); switch (setting.connection_case()) { case FederatedQuery::ConnectionSetting::kYdbDatabase: - return setting.data_streams().auth().identity_case(); + return setting.ydb_database().auth().identity_case(); case FederatedQuery::ConnectionSetting::kClickhouseCluster: return setting.clickhouse_cluster().auth().identity_case(); case FederatedQuery::ConnectionSetting::kObjectStorage: diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp index 6104e99ef9d0..3db5f119ad84 100644 --- a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp +++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp @@ -105,11 +105,14 @@ namespace NKikimr::NKqp { GenericGatewaysConfig}; // Init DatabaseAsyncResolver only if all requirements are met - if (DatabaseResolverActorId && GenericGatewaysConfig.HasMdbGateway() && MdbEndpointGenerator) { + if (DatabaseResolverActorId && + GenericGatewaysConfig.HasMdbGateway() && + GenericGatewaysConfig.HasYdbMvpEndpoint() && + MdbEndpointGenerator) { result.DatabaseAsyncResolver = std::make_shared( actorSystem, DatabaseResolverActorId.value(), - "", // TODO: use YDB Gateway endpoint? + GenericGatewaysConfig.GetYdbMvpEndpoint(), GenericGatewaysConfig.GetMdbGateway(), MdbEndpointGenerator); } diff --git a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp index f13d1155d788..cc280bb6b0f6 100644 --- a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp +++ b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp @@ -65,11 +65,12 @@ void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescri } static const TSet properties { - "database_name", - "protocol", - "mdb_cluster_id", + "database_name", + "protocol", // managed PG, CH + "mdb_cluster_id", // managed PG, CH + "database_id", // managed YDB "use_tls", - "schema" + "schema", // managed PG }; for (const auto& property: properties) { diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 2a91f095afb2..0cc2706a6d2d 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1493,7 +1493,7 @@ class TKqpHost : public IKqpHost { TypesCtx.Get(), FuncRegistry, FederatedQuerySetup->DatabaseAsyncResolver, - nullptr, + FederatedQuerySetup->CredentialsFactory, FederatedQuerySetup->ConnectorClient, FederatedQuerySetup->GenericGatewayConfig ); diff --git a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp index f176716a1f78..a57c404fa5a8 100644 --- a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp @@ -74,6 +74,22 @@ namespace NKikimr::NKqp { return settings; } + std::shared_ptr MakeDatabaseAsyncResolver(EProviderType providerType) { + std::shared_ptr databaseAsyncResolverMock; + + switch (providerType) { + case EProviderType::ClickHouse: + // We test access to managed databases only on the example of ClickHouse + databaseAsyncResolverMock = std::make_shared(); + databaseAsyncResolverMock->AddClickHouseCluster(); + break; + default: + break; + } + + return databaseAsyncResolverMock; + } + Y_UNIT_TEST_SUITE(GenericFederatedQuery) { void TestSelectAllFields(EProviderType providerType) { // prepare mock @@ -124,11 +140,7 @@ namespace NKikimr::NKqp { // clang-format on // prepare database resolver mock - std::shared_ptr databaseAsyncResolverMock; - if (providerType == EProviderType::ClickHouse) { - databaseAsyncResolverMock = std::make_shared(); - databaseAsyncResolverMock->AddClickHouseCluster(); - } + auto databaseAsyncResolverMock = MakeDatabaseAsyncResolver(providerType); // run test auto appConfig = CreateDefaultAppConfig(); @@ -161,15 +173,15 @@ namespace NKikimr::NKqp { MATCH_RESULT_WITH_INPUT(colData, resultSet, GetUint16); } - Y_UNIT_TEST(PostgreSQLLocal) { + Y_UNIT_TEST(PostgreSQLOnPremSelectAll) { TestSelectAllFields(EProviderType::PostgreSQL); } - Y_UNIT_TEST(ClickHouseManaged) { + Y_UNIT_TEST(ClickHouseManagedSelectAll) { TestSelectAllFields(EProviderType::ClickHouse); } - Y_UNIT_TEST(YdbManaged) { + Y_UNIT_TEST(YdbManagedSelectAll) { TestSelectAllFields(EProviderType::Ydb); } @@ -220,11 +232,7 @@ namespace NKikimr::NKqp { // clang-format on // prepare database resolver mock - std::shared_ptr databaseAsyncResolverMock; - if (providerType == EProviderType::ClickHouse) { - databaseAsyncResolverMock = std::make_shared(); - databaseAsyncResolverMock->AddClickHouseCluster(); - } + auto databaseAsyncResolverMock = MakeDatabaseAsyncResolver(providerType); // run test auto appConfig = CreateDefaultAppConfig(); @@ -256,7 +264,7 @@ namespace NKikimr::NKqp { } } - Y_UNIT_TEST(PostgreSQLSelectConstant) { + Y_UNIT_TEST(PostgreSQLOnPremSelectConstant) { TestSelectConstant(EProviderType::PostgreSQL); } @@ -315,11 +323,7 @@ namespace NKikimr::NKqp { // clang-format on // prepare database resolver mock - std::shared_ptr databaseAsyncResolverMock; - if (providerType == EProviderType::ClickHouse) { - databaseAsyncResolverMock = std::make_shared(); - databaseAsyncResolverMock->AddClickHouseCluster(); - } + auto databaseAsyncResolverMock = MakeDatabaseAsyncResolver(providerType); // run test auto appConfig = CreateDefaultAppConfig(); @@ -422,11 +426,7 @@ namespace NKikimr::NKqp { // clang-format on // prepare database resolver mock - std::shared_ptr databaseAsyncResolverMock; - if (providerType == EProviderType::ClickHouse) { - databaseAsyncResolverMock = std::make_shared(); - databaseAsyncResolverMock->AddClickHouseCluster(); - } + auto databaseAsyncResolverMock = MakeDatabaseAsyncResolver(providerType); // run test auto appConfig = CreateDefaultAppConfig(); diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index bc4a5645342b..e940492f285e 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -858,11 +858,11 @@ namespace Tests { ); std::shared_ptr databaseAsyncResolver; - if (queryServiceConfig.GetGeneric().HasMdbGateway() && queryServiceConfig.HasMdbTransformHost()) { + if (queryServiceConfig.GetGeneric().HasMdbGateway() || queryServiceConfig.GetGeneric().HasYdbMvpEndpoint()) { databaseAsyncResolver = std::make_shared( Runtime->GetActorSystem(nodeIdx), databaseResolverActorId, - "", + queryServiceConfig.GetGeneric().GetYdbMvpEndpoint(), queryServiceConfig.GetGeneric().GetMdbGateway(), NFq::MakeMdbEndpointGeneratorGeneric(queryServiceConfig.GetMdbTransformHost()) ); diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index 865d42bc7bf8..1838bdd39710 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -608,9 +608,14 @@ message TGenericGatewayConfig { // Database clusters supported by this particular instance repeated TGenericClusterConfig ClusterMapping = 3; - // MDB API endpoint (do not fill in case of on-prem deployment) + // MDB API endpoint (no need to fill in case of on-prem deployment). optional string MdbGateway = 4; + // YDB MVP API endpoint (no need to fill in case of on-prem deployment). + // Expected format: + // [http|https]://host:port/ydbc/cloud-prod/ + optional string YdbMvpEndpoint = 7; + repeated TAttr DefaultSettings = 6; reserved 1, 2; @@ -619,7 +624,8 @@ message TGenericGatewayConfig { /////////////////////////////// Db Resolver /////////////////////////////////// message TDbResolverConfig { - // Ydb / Yds MVP endpoint. Expected format: + // Ydb / Yds MVP endpoint. + // Expected format: // [http|https]://host:port/ydbc/cloud-prod/ optional string YdbMvpEndpoint = 2; } diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp index 09fbd4f8c599..7eed47039379 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp @@ -18,6 +18,5 @@ namespace NYql::NConnector::NTest { extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE = "sa_signature"; extern const TString DEFAULT_YDB_HOST = "localhost"; - extern const TString DEFAULT_YDB_DATABASE = "local"; extern const TString DEFAULT_YDB_ENDPOINT = TStringBuilder() << DEFAULT_YDB_HOST << ':' << DEFAULT_YDB_PORT; } // namespace NYql::NConnector::NTest diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h index f5e28b3e9f37..bbca9127a4bd 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h @@ -26,7 +26,6 @@ namespace NYql::NConnector::NTest { extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID; extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE; - extern const TString DEFAULT_YDB_DATABASE; extern const TString DEFAULT_YDB_HOST; constexpr int DEFAULT_YDB_PORT = 2136; extern const TString DEFAULT_YDB_ENDPOINT; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp index e0c9f14a2225..886b493076ee 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -11,8 +12,6 @@ #include "yql_generic_cluster_config.h" namespace NYql { - using namespace NConnector; - using namespace NConnector::NApi; using namespace fmt::literals; void ParseLogin( @@ -20,7 +19,8 @@ namespace NYql { NYql::TGenericClusterConfig& clusterConfig) { auto it = properties.find("login"); if (it == properties.cend()) { - ythrow yexception() << "missing 'LOGIN' value"; + // It's OK not to have credentials for base auth + return; } if (!it->second) { @@ -35,7 +35,8 @@ namespace NYql { NYql::TGenericClusterConfig& clusterConfig) { auto it = properties.find("password"); if (it == properties.cend()) { - ythrow yexception() << "missing 'PASSWORD' value"; + // It's OK not to have credentials for base auth + return; } clusterConfig.MutableCredentials()->Mutablebasic()->Setpassword(it->second); @@ -157,6 +158,25 @@ namespace NYql { clusterConfig.SetDatabaseId(it->second); } + void ParseDatabaseId(const THashMap& properties, + NYql::TGenericClusterConfig& clusterConfig) { + auto it = properties.find("database_id"); + if (it == properties.cend()) { + return; + } + + if (!it->second) { + // DATABASE_ID is an optional field + return; + } + + if (!it->second) { + ythrow yexception() << "invalid 'DATABASE_ID' value: '" << it->second << "'"; + } + + clusterConfig.SetDatabaseId(it->second); + } + void ParseSourceType(const THashMap& properties, NYql::TGenericClusterConfig& clusterConfig) { auto it = properties.find("source_type"); @@ -240,20 +260,6 @@ namespace NYql { } TGenericClusterConfig GenericClusterConfigFromProperties(const TString& clusterName, const THashMap& properties) { - // some cross-parameter validations - auto location = KeyIsSet(properties, "location"); - auto mdbClusterId = KeyIsSet(properties, "mdb_cluster_id"); - - if ((location && mdbClusterId) || (!location and !mdbClusterId)) { - ythrow yexception() << "you must provide either 'LOCATION' or 'MDB_CLUSTER_ID' parameter"; - } - - auto serviceAccountId = KeyIsSet(properties, "serviceAccountId"); - auto serviceAccountIdSignature = KeyIsSet(properties, "serviceAccountIdSignature"); - if ((serviceAccountId && !serviceAccountIdSignature) || (!serviceAccountId && serviceAccountIdSignature)) { - ythrow yexception() << "you must provide either both 'SERVICE_ACCOUNT_ID' and 'SERVICE_ACCOUNT_ID_SIGNATURE' parameters or none of them"; - } - NYql::TGenericClusterConfig clusterConfig; clusterConfig.set_name(clusterName); ParseLogin(properties, clusterConfig); @@ -263,6 +269,7 @@ namespace NYql { ParseDatabaseName(properties, clusterConfig); ParseSchema(properties, clusterConfig); ParseMdbClusterId(properties, clusterConfig); + ParseDatabaseId(properties, clusterConfig); ParseSourceType(properties, clusterConfig); ParseProtocol(properties, clusterConfig); ParseServiceAccountId(properties, clusterConfig); @@ -310,27 +317,16 @@ namespace NYql { "protocol"_a = NConnector::NApi::EProtocol_Name(clusterConfig.GetProtocol())); } + static const TSet managedDatabaseKinds{ + NConnector::NApi::EDataSourceKind::POSTGRESQL, + NConnector::NApi::EDataSourceKind::CLICKHOUSE, + NConnector::NApi::EDataSourceKind::YDB}; + void ValidateGenericClusterConfig( const NYql::TGenericClusterConfig& clusterConfig, const TString& context) { - // cross-parameter validations for optional fields - auto hasEndpoint = clusterConfig.HasEndpoint(); - auto databaseId = clusterConfig.GetDatabaseId(); - - if (hasEndpoint && databaseId) { - return ValidationError( - clusterConfig, - context, - "both 'Endpoint' and 'DatabaseId' fields are set; you must set only one of them"); - } - - if (!hasEndpoint and !databaseId) { - return ValidationError( - clusterConfig, - context, - "none of 'Endpoint' and 'DatabaseId' fields are set; you must set one of them"); - } - + // Service account ID and service account ID signature are tightly coupled: + // if one is set, another one must be set too. auto serviceAccountId = clusterConfig.GetServiceAccountId(); auto serviceAccountIdSignature = clusterConfig.GetServiceAccountIdSignature(); if (serviceAccountId && !serviceAccountIdSignature) { @@ -349,6 +345,8 @@ namespace NYql { "you must set either both 'ServiceAccountId' and 'ServiceAccountIdSignature' fields or none of them"); } + // Service account credentials and raw tokens are mutually exclusive: + // no need to specify service account parameters if one already has a token. auto token = clusterConfig.GetToken(); if ((serviceAccountId && serviceAccountIdSignature) && token) { return ValidationError( @@ -357,6 +355,31 @@ namespace NYql { "you must set either ('ServiceAccountId', 'ServiceAccountIdSignature') fields or 'Token' field or none of them"); } + // All managed databases: + // * set endpoint when working with on-prem instances + // * set database id when working with managed instances + if (managedDatabaseKinds.contains(clusterConfig.GetKind())) { + auto hasEndpoint = clusterConfig.HasEndpoint(); + auto hasDatabaseId = clusterConfig.HasDatabaseId(); + + if (hasEndpoint && hasDatabaseId) { + return ValidationError( + clusterConfig, + context, + "both 'Endpoint' and 'DatabaseId' fields are set; you must set only one of them"); + } + + if (!hasEndpoint and !hasDatabaseId) { + return ValidationError( + clusterConfig, + context, + "none of 'Endpoint' and 'DatabaseId' fields are set; you must set one of them"); + } + } + + // YDB: + // * set database name when working with on-prem YDB instance; + // * but set database ID when working with managed YDB. if (clusterConfig.GetKind() == NConnector::NApi::YDB) { if (clusterConfig.HasDatabaseName() && clusterConfig.HasDatabaseId()) { return ValidationError( @@ -378,14 +401,14 @@ namespace NYql { return ValidationError(clusterConfig, context, "empty field 'Name'"); } - if (clusterConfig.GetKind() == EDataSourceKind::DATA_SOURCE_KIND_UNSPECIFIED) { + if (clusterConfig.GetKind() == NConnector::NApi::EDataSourceKind::DATA_SOURCE_KIND_UNSPECIFIED) { return ValidationError(clusterConfig, context, "empty field 'Kind'"); } // TODO: validate Credentials.basic.password after ClickHouse recipe fix // TODO: validate DatabaseName field during https://st.yandex-team.ru/YQ-2494 - if (clusterConfig.GetProtocol() == EProtocol::PROTOCOL_UNSPECIFIED) { + if (clusterConfig.GetProtocol() == NConnector::NApi::EProtocol::PROTOCOL_UNSPECIFIED) { return ValidationError(clusterConfig, context, "empty field 'Protocol'"); } } From 115a6642a411d5b60b26711bf44edda080b44822 Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Wed, 13 Mar 2024 10:40:05 +0300 Subject: [PATCH 7/7] Relax configuration requirements when starting `DatabaseAsyncResolver` from KQP (#2683) --- .../kqp/federated_query/kqp_federated_query_helpers.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp index 3db5f119ad84..aa6f8b3ac855 100644 --- a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp +++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp @@ -105,10 +105,8 @@ namespace NKikimr::NKqp { GenericGatewaysConfig}; // Init DatabaseAsyncResolver only if all requirements are met - if (DatabaseResolverActorId && - GenericGatewaysConfig.HasMdbGateway() && - GenericGatewaysConfig.HasYdbMvpEndpoint() && - MdbEndpointGenerator) { + if (DatabaseResolverActorId && MdbEndpointGenerator && + (GenericGatewaysConfig.HasMdbGateway() || GenericGatewaysConfig.HasYdbMvpEndpoint())) { result.DatabaseAsyncResolver = std::make_shared( actorSystem, DatabaseResolverActorId.value(),