diff --git a/ydb/core/external_sources/external_source_factory.cpp b/ydb/core/external_sources/external_source_factory.cpp index cd04a3530aec..3db5257f884a 100644 --- a/ydb/core/external_sources/external_source_factory.cpp +++ b/ydb/core/external_sources/external_source_factory.cpp @@ -46,6 +46,10 @@ IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vectorAddClusterMapping(); + auto* clusterCfg = gatewaysConfig.MutableGeneric()->AddClusterMapping(); + clusterCfg->SetKind(NYql::NConnector::NApi::EDataSourceKind::YDB); + clusterCfg->SetProtocol(NYql::NConnector::NApi::EProtocol::NATIVE); clusterCfg->SetName(connectionName); - clusterCfg->SetId(db.database_id()); - if (db.database()) - clusterCfg->SetDatabase(db.database()); - if (db.endpoint()) - clusterCfg->SetEndpoint(db.endpoint()); - clusterCfg->SetSecure(db.secure()); - clusterCfg->SetAddBearerToToken(common.GetUseBearerForYdb()); + clusterCfg->SetDatabaseId(db.database_id()); + clusterCfg->SetUseSsl(!common.GetDisableSslForGenericDataSources()); FillClusterAuth(*clusterCfg, db.auth(), authToken, accountIdSignatures); - clusters.emplace(connectionName, YdbProviderName); + clusters.emplace(connectionName, GenericProviderName); break; } case FederatedQuery::ConnectionSetting::kClickhouseCluster: { diff --git a/ydb/core/fq/libs/actors/database_resolver.cpp b/ydb/core/fq/libs/actors/database_resolver.cpp index f0c7e8b16a15..91a0ec4963ea 100644 --- a/ydb/core/fq/libs/actors/database_resolver.cpp +++ b/ydb/core/fq/libs/actors/database_resolver.cpp @@ -1,5 +1,6 @@ #include "database_resolver.h" +#include #include #include #include @@ -128,7 +129,7 @@ class TResponseProcessor : public TActorBootstrapped const auto requestIter = Requests.find(ev->Get()->Request); HandledIds++; - LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): got MDB API response: code=" << ev->Get()->Response->Status); + LOG_T("ResponseProcessor::Handle(HttpIncomingResponse): got API response: code=" << ev->Get()->Response->Status); try { HandleResponse(ev, requestIter, result); @@ -305,7 +306,12 @@ class TDatabaseResolver: public TActor } Y_ENSURE(endpoint); - return TDatabaseDescription{endpoint, "", 0, database, secure}; + + TVector split = StringSplitter(endpoint).Split(':'); + + Y_ENSURE(split.size() == 2); + + return TDatabaseDescription{endpoint, split[0], FromString(split[1]), database, secure}; }; Parsers[NYql::EDatabaseType::Ydb] = ydbParser; Parsers[NYql::EDatabaseType::DataStreams] = [ydbParser]( @@ -320,9 +326,11 @@ class TDatabaseResolver: public TActor if (!isDedicatedDb && ret.Endpoint.StartsWith("ydb.")) { // Replace "ydb." -> "yds." ret.Endpoint[2] = 's'; + ret.Host[2] = 's'; } if (isDedicatedDb) { ret.Endpoint = "u-" + ret.Endpoint; + ret.Host = "u-" + ret.Host; } return ret; }; @@ -479,6 +487,7 @@ class TDatabaseResolver: public TActor try { TString url; if (IsIn({NYql::EDatabaseType::Ydb, NYql::EDatabaseType::DataStreams }, databaseType)) { + YQL_ENSURE(ev->Get()->YdbMvpEndpoint.Size() > 0, "empty YDB MVP Endpoint"); url = TUrlBuilder(ev->Get()->YdbMvpEndpoint + "/database") .AddUrlParam("databaseId", databaseId) .Build(); @@ -490,7 +499,6 @@ class TDatabaseResolver: public TActor .AddPathComponent("hosts") .Build(); } - LOG_D("ResponseProccessor::Handle(EndpointRequest): start GET request: " << url); NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet(url); @@ -500,6 +508,8 @@ class TDatabaseResolver: public TActor httpRequest->Set("Authorization", token); } + LOG_D("ResponseProccessor::Handle(EndpointRequest): start GET request: " << "url: " << httpRequest->URL); + requests[httpRequest] = TResolveParams{databaseId, databaseType, databaseAuth}; } catch (const std::exception& e) { const TString msg = TStringBuilder() << "error while preparing to resolve database id: " << databaseId diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp index 4ead981b5004..80790ccb9f7e 100644 --- a/ydb/core/fq/libs/actors/run_actor.cpp +++ b/ydb/core/fq/libs/actors/run_actor.cpp @@ -30,7 +30,6 @@ #include #include #include -#include #include #include #include @@ -1940,11 +1939,7 @@ class TRunActor : public NActors::TActorBootstrapped { } { - dataProvidersInit.push_back(GetYdbDataProviderInitializer(Params.YqSharedResources->UserSpaceYdbDriver, Params.CredentialsFactory, dbResolver)); - } - - { - dataProvidersInit.push_back(GetGenericDataProviderInitializer(Params.ConnectorClient, dbResolver)); + dataProvidersInit.push_back(GetGenericDataProviderInitializer(Params.ConnectorClient, dbResolver, Params.CredentialsFactory)); } { diff --git a/ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp b/ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp index ff9705190512..57a552c7a802 100644 --- a/ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp +++ b/ydb/core/fq/libs/actors/ut/database_resolver_ut.cpp @@ -174,8 +174,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) { })", NYql::TDatabaseResolverResponse::TDatabaseDescription{ TString{"ydb.serverless.yandexcloud.net:2135"}, - TString{""}, - 0, + TString{"ydb.serverless.yandexcloud.net"}, + 2135, TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"), true }, @@ -195,8 +195,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) { })", NYql::TDatabaseResolverResponse::TDatabaseDescription{ TString{"yds.serverless.yandexcloud.net:2135"}, - TString{""}, - 0, + TString{"yds.serverless.yandexcloud.net"}, + 2135, TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"), true }, @@ -217,8 +217,8 @@ Y_UNIT_TEST_SUITE(TDatabaseResolverTests) { })", NYql::TDatabaseResolverResponse::TDatabaseDescription{ TString{"u-lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net:2135"}, - TString{""}, - 0, + TString{"u-lb.etn021us5r9rhld1vgbh.ydb.mdb.yandexcloud.net"}, + 2135, TString("/ru-central1/b1g7jdjqd07qg43c4fmp/etn021us5r9rhld1vgbh"), true }, diff --git a/ydb/core/fq/libs/actors/ya.make b/ydb/core/fq/libs/actors/ya.make index bcf906c56a51..42d277fbc98c 100644 --- a/ydb/core/fq/libs/actors/ya.make +++ b/ydb/core/fq/libs/actors/ya.make @@ -80,7 +80,6 @@ PEERDIR( ydb/library/yql/providers/pq/provider ydb/library/yql/providers/pq/task_meta ydb/library/yql/providers/s3/provider - ydb/library/yql/providers/ydb/provider ydb/library/yql/public/issue ydb/library/yql/public/issue/protos ydb/library/yql/sql/settings diff --git a/ydb/core/fq/libs/compute/common/config.h b/ydb/core/fq/libs/compute/common/config.h index 6acf4cc895e7..c0aa41ce441f 100644 --- a/ydb/core/fq/libs/compute/common/config.h +++ b/ydb/core/fq/libs/compute/common/config.h @@ -164,8 +164,8 @@ class TComputeConfig { case FederatedQuery::ConnectionSetting::kObjectStorage: case FederatedQuery::ConnectionSetting::kClickhouseCluster: case FederatedQuery::ConnectionSetting::kPostgresqlCluster: - return true; case FederatedQuery::ConnectionSetting::kYdbDatabase: + return true; case FederatedQuery::ConnectionSetting::kDataStreams: case FederatedQuery::ConnectionSetting::kMonitoring: case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET: diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp index 61f9bd21cf7e..f8fc94ca2c91 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp @@ -178,6 +178,14 @@ TString MakeCreateExternalDataSourceQuery( switch (connectionContent.setting().connection_case()) { case FederatedQuery::ConnectionSetting::CONNECTION_NOT_SET: case FederatedQuery::ConnectionSetting::kYdbDatabase: + properties = fmt::format( + R"( + SOURCE_TYPE="Ydb", + DATABASE_ID={database_id}, + USE_TLS="{use_tls}" + )", + "database_id"_a = EncloseAndEscapeString(connectionContent.setting().ydb_database().database_id(), '"'), + "use_tls"_a = common.GetDisableSslForGenericDataSources() ? "false" : "true"); break; case FederatedQuery::ConnectionSetting::kClickhouseCluster: properties = fmt::format( diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index 7ab56126f135..08dae4bbf5ed 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -28,7 +28,7 @@ FederatedQuery::IamAuth::IdentityCase GetIamAuth(const FederatedQuery::Connectio const auto& setting = connection.content().setting(); switch (setting.connection_case()) { case FederatedQuery::ConnectionSetting::kYdbDatabase: - return setting.data_streams().auth().identity_case(); + return setting.ydb_database().auth().identity_case(); case FederatedQuery::ConnectionSetting::kClickhouseCluster: return setting.clickhouse_cluster().auth().identity_case(); case FederatedQuery::ConnectionSetting::kObjectStorage: diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index 1dd183dcae6f..c5413c6a698b 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -177,7 +177,10 @@ void Init( &protoConfig.GetGateways().GetHttpGateway(), yqCounters->GetSubgroup("subcomponent", "http_gateway")); - const auto connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric().GetConnector()); + NYql::NConnector::IClient::TPtr connectorClient = nullptr; + if (protoConfig.GetGateways().GetGeneric().HasConnector()) { + connectorClient = NYql::NConnector::MakeClientGRPC(protoConfig.GetGateways().GetGeneric().GetConnector()); + } if (protoConfig.GetTokenAccessor().GetEnabled()) { const auto& tokenAccessorConfig = protoConfig.GetTokenAccessor(); diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp index 6104e99ef9d0..aa6f8b3ac855 100644 --- a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp +++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp @@ -105,11 +105,12 @@ namespace NKikimr::NKqp { GenericGatewaysConfig}; // Init DatabaseAsyncResolver only if all requirements are met - if (DatabaseResolverActorId && GenericGatewaysConfig.HasMdbGateway() && MdbEndpointGenerator) { + if (DatabaseResolverActorId && MdbEndpointGenerator && + (GenericGatewaysConfig.HasMdbGateway() || GenericGatewaysConfig.HasYdbMvpEndpoint())) { result.DatabaseAsyncResolver = std::make_shared( actorSystem, DatabaseResolverActorId.value(), - "", // TODO: use YDB Gateway endpoint? + GenericGatewaysConfig.GetYdbMvpEndpoint(), GenericGatewaysConfig.GetMdbGateway(), MdbEndpointGenerator); } diff --git a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp index f13d1155d788..cc280bb6b0f6 100644 --- a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp +++ b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp @@ -65,11 +65,12 @@ void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescri } static const TSet properties { - "database_name", - "protocol", - "mdb_cluster_id", + "database_name", + "protocol", // managed PG, CH + "mdb_cluster_id", // managed PG, CH + "database_id", // managed YDB "use_tls", - "schema" + "schema", // managed PG }; for (const auto& property: properties) { diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 1434efffa11b..0cc2706a6d2d 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1493,6 +1493,7 @@ class TKqpHost : public IKqpHost { TypesCtx.Get(), FuncRegistry, FederatedQuerySetup->DatabaseAsyncResolver, + FederatedQuerySetup->CredentialsFactory, FederatedQuerySetup->ConnectorClient, FederatedQuerySetup->GenericGatewayConfig ); diff --git a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp index 87910903ac18..a57c404fa5a8 100644 --- a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp @@ -31,6 +31,7 @@ namespace NKikimr::NKqp { enum class EProviderType { PostgreSQL, ClickHouse, + Ydb, }; NApi::TDataSourceInstance MakeDataSourceInstance(EProviderType providerType) { @@ -39,6 +40,8 @@ namespace NKikimr::NKqp { return TConnectorClientMock::TPostgreSQLDataSourceInstanceBuilder<>().GetResult(); case EProviderType::ClickHouse: return TConnectorClientMock::TClickHouseDataSourceInstanceBuilder<>().GetResult(); + case EProviderType::Ydb: + return TConnectorClientMock::TYdbDataSourceInstanceBuilder<>().GetResult(); } } @@ -48,6 +51,8 @@ namespace NKikimr::NKqp { return CreatePostgreSQLExternalDataSource(kikimr); case EProviderType::ClickHouse: return CreateClickHouseExternalDataSource(kikimr); + case EProviderType::Ydb: + return CreateYdbExternalDataSource(kikimr); } } @@ -69,6 +74,22 @@ namespace NKikimr::NKqp { return settings; } + std::shared_ptr MakeDatabaseAsyncResolver(EProviderType providerType) { + std::shared_ptr databaseAsyncResolverMock; + + switch (providerType) { + case EProviderType::ClickHouse: + // We test access to managed databases only on the example of ClickHouse + databaseAsyncResolverMock = std::make_shared(); + databaseAsyncResolverMock->AddClickHouseCluster(); + break; + default: + break; + } + + return databaseAsyncResolverMock; + } + Y_UNIT_TEST_SUITE(GenericFederatedQuery) { void TestSelectAllFields(EProviderType providerType) { // prepare mock @@ -103,7 +124,6 @@ namespace NKikimr::NKqp { // step 3: ReadSplits std::vector colData = {10, 20, 30, 40, 50}; clientMock->ExpectReadSplits() - .DataSourceInstance(dataSourceInstance) .Split() .Description("some binary description") .Select() @@ -120,11 +140,7 @@ namespace NKikimr::NKqp { // clang-format on // prepare database resolver mock - std::shared_ptr databaseAsyncResolverMock; - if (providerType == EProviderType::ClickHouse) { - databaseAsyncResolverMock = std::make_shared(); - databaseAsyncResolverMock->AddClickHouseCluster(); - } + auto databaseAsyncResolverMock = MakeDatabaseAsyncResolver(providerType); // run test auto appConfig = CreateDefaultAppConfig(); @@ -157,14 +173,18 @@ namespace NKikimr::NKqp { MATCH_RESULT_WITH_INPUT(colData, resultSet, GetUint16); } - Y_UNIT_TEST(PostgreSQLLocal) { + Y_UNIT_TEST(PostgreSQLOnPremSelectAll) { TestSelectAllFields(EProviderType::PostgreSQL); } - Y_UNIT_TEST(ClickHouseManaged) { + Y_UNIT_TEST(ClickHouseManagedSelectAll) { TestSelectAllFields(EProviderType::ClickHouse); } + Y_UNIT_TEST(YdbManagedSelectAll) { + TestSelectAllFields(EProviderType::Ydb); + } + void TestSelectConstant(EProviderType providerType) { // prepare mock auto clientMock = std::make_shared(); @@ -199,7 +219,6 @@ namespace NKikimr::NKqp { // step 3: ReadSplits clientMock->ExpectReadSplits() - .DataSourceInstance(dataSourceInstance) .Split() .Description("some binary description") .Select() @@ -213,11 +232,7 @@ namespace NKikimr::NKqp { // clang-format on // prepare database resolver mock - std::shared_ptr databaseAsyncResolverMock; - if (providerType == EProviderType::ClickHouse) { - databaseAsyncResolverMock = std::make_shared(); - databaseAsyncResolverMock->AddClickHouseCluster(); - } + auto databaseAsyncResolverMock = MakeDatabaseAsyncResolver(providerType); // run test auto appConfig = CreateDefaultAppConfig(); @@ -249,7 +264,7 @@ namespace NKikimr::NKqp { } } - Y_UNIT_TEST(PostgreSQLSelectConstant) { + Y_UNIT_TEST(PostgreSQLOnPremSelectConstant) { TestSelectConstant(EProviderType::PostgreSQL); } @@ -257,6 +272,10 @@ namespace NKikimr::NKqp { TestSelectConstant(EProviderType::ClickHouse); } + Y_UNIT_TEST(YdbManagedSelectConstant) { + TestSelectConstant(EProviderType::Ydb); + } + void TestSelectCount(EProviderType providerType) { // prepare mock auto clientMock = std::make_shared(); @@ -291,7 +310,6 @@ namespace NKikimr::NKqp { // step 3: ReadSplits clientMock->ExpectReadSplits() - .DataSourceInstance(dataSourceInstance) .Split() .Description("some binary description") .Select() @@ -305,11 +323,7 @@ namespace NKikimr::NKqp { // clang-format on // prepare database resolver mock - std::shared_ptr databaseAsyncResolverMock; - if (providerType == EProviderType::ClickHouse) { - databaseAsyncResolverMock = std::make_shared(); - databaseAsyncResolverMock->AddClickHouseCluster(); - } + auto databaseAsyncResolverMock = MakeDatabaseAsyncResolver(providerType); // run test auto appConfig = CreateDefaultAppConfig(); @@ -345,6 +359,10 @@ namespace NKikimr::NKqp { TestSelectCount(EProviderType::ClickHouse); } + Y_UNIT_TEST(YdbSelectCount) { + TestSelectCount(EProviderType::Ydb); + } + void TestFilterPushdown(EProviderType providerType) { // prepare mock auto clientMock = std::make_shared(); @@ -396,7 +414,6 @@ namespace NKikimr::NKqp { std::vector filterColumnData = {42, 24}; // clang-format off clientMock->ExpectReadSplits() - .DataSourceInstance(dataSourceInstance) .Split() .Description("some binary description") .Select(select) @@ -409,11 +426,7 @@ namespace NKikimr::NKqp { // clang-format on // prepare database resolver mock - std::shared_ptr databaseAsyncResolverMock; - if (providerType == EProviderType::ClickHouse) { - databaseAsyncResolverMock = std::make_shared(); - databaseAsyncResolverMock->AddClickHouseCluster(); - } + auto databaseAsyncResolverMock = MakeDatabaseAsyncResolver(providerType); // run test auto appConfig = CreateDefaultAppConfig(); @@ -450,5 +463,9 @@ namespace NKikimr::NKqp { Y_UNIT_TEST(ClickHouseFilterPushdown) { TestFilterPushdown(EProviderType::ClickHouse); } + + Y_UNIT_TEST(YdbFilterPushdown) { + TestFilterPushdown(EProviderType::Ydb); + } } } diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index bc4a5645342b..e940492f285e 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -858,11 +858,11 @@ namespace Tests { ); std::shared_ptr databaseAsyncResolver; - if (queryServiceConfig.GetGeneric().HasMdbGateway() && queryServiceConfig.HasMdbTransformHost()) { + if (queryServiceConfig.GetGeneric().HasMdbGateway() || queryServiceConfig.GetGeneric().HasYdbMvpEndpoint()) { databaseAsyncResolver = std::make_shared( Runtime->GetActorSystem(nodeIdx), databaseResolverActorId, - "", + queryServiceConfig.GetGeneric().GetYdbMvpEndpoint(), queryServiceConfig.GetGeneric().GetMdbGateway(), NFq::MakeMdbEndpointGeneratorGeneric(queryServiceConfig.GetMdbTransformHost()) ); diff --git a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h index b5fcf35bc8e2..eef520630710 100644 --- a/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h +++ b/ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h @@ -22,6 +22,8 @@ inline EDatabaseType DatabaseTypeFromDataSourceKind(NConnector::NApi::EDataSourc return EDatabaseType::PostgreSQL; case NConnector::NApi::EDataSourceKind::CLICKHOUSE: return EDatabaseType::ClickHouse; + case NConnector::NApi::EDataSourceKind::YDB: + return EDatabaseType::Ydb; default: ythrow yexception() << "Unknown data source kind: " << NConnector::NApi::EDataSourceKind_Name(dataSourceKind); } @@ -33,6 +35,8 @@ inline NConnector::NApi::EDataSourceKind DatabaseTypeToDataSourceKind(EDatabaseT return NConnector::NApi::EDataSourceKind::POSTGRESQL; case EDatabaseType::ClickHouse: return NConnector::NApi::EDataSourceKind::CLICKHOUSE; + case EDatabaseType::Ydb: + return NConnector::NApi::EDataSourceKind::YDB; default: ythrow yexception() << "Unknown database type: " << ToString(databaseType); } diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index fb845b8cee7b..1838bdd39710 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -563,12 +563,11 @@ message TGenericClusterConfig { // Credentials used to access data source instance optional NYql.NConnector.NApi.TCredentials Credentials = 10; - // Credentials used to access MDB API. - // When working with data source instances deployed in a cloud, - // you should either set (ServiceAccountId, ServiceAccountIdSignature) pair, - // or set IAM Token. - // The names of these fields must satisfy this template function: - // https://github.com/ydb-platform/ydb/arcadia/contrib/ydb/core/fq/libs/actors/clusters_from_connections.cpp?rev=r11823087#L19 + // Credentials used to access managed databases APIs. + // When working with external data source instances deployed in clouds, + // one should either set (ServiceAccountId, ServiceAccountIdSignature) pair + // that will be resolved into IAM Token via Token Accessor, + // or provide IAM Token directly. optional string ServiceAccountId = 6; optional string ServiceAccountIdSignature = 7; optional string Token = 11; @@ -592,9 +591,11 @@ message TGenericClusterConfig { message TGenericConnectorConfig { // Connector instance network endpoint optional NYql.NConnector.NApi.TEndpoint Endpoint = 3; - // If true, GRPC Client will use TLS encryption. - // Server cert will be verified with system CA cert pool. + // If true, Connector GRPC Client will use TLS encryption. optional bool UseSsl = 4; + // Path to the custom CA certificate to verify Connector's certs. + // If empty, the default system CA certificate pool will be used. + optional string SslCaCrt = 5; reserved 1, 2; } @@ -607,9 +608,14 @@ message TGenericGatewayConfig { // Database clusters supported by this particular instance repeated TGenericClusterConfig ClusterMapping = 3; - // MDB API endpoint (do not fill in case of on-prem deployment) + // MDB API endpoint (no need to fill in case of on-prem deployment). optional string MdbGateway = 4; + // YDB MVP API endpoint (no need to fill in case of on-prem deployment). + // Expected format: + // [http|https]://host:port/ydbc/cloud-prod/ + optional string YdbMvpEndpoint = 7; + repeated TAttr DefaultSettings = 6; reserved 1, 2; @@ -618,7 +624,9 @@ message TGenericGatewayConfig { /////////////////////////////// Db Resolver /////////////////////////////////// message TDbResolverConfig { - // Ydb / Yds mvp endpoint + // Ydb / Yds MVP endpoint. + // Expected format: + // [http|https]://host:port/ydbc/cloud-prod/ optional string YdbMvpEndpoint = 2; } diff --git a/ydb/library/yql/providers/generic/actors/ya.make b/ydb/library/yql/providers/generic/actors/ya.make index 31ec4480c9ef..53f40afdca7c 100644 --- a/ydb/library/yql/providers/generic/actors/ya.make +++ b/ydb/library/yql/providers/generic/actors/ya.make @@ -3,15 +3,18 @@ LIBRARY() SRCS( yql_generic_read_actor.cpp yql_generic_source_factory.cpp + yql_generic_token_provider.cpp ) PEERDIR( ydb/library/yql/dq/actors/compute ydb/library/yql/minikql/computation + ydb/library/yql/providers/common/structured_token ydb/library/yql/providers/common/token_accessor/client ydb/library/yql/providers/generic/proto ydb/library/yql/public/types ydb/library/yql/providers/generic/connector/libcpp + ydb/public/sdk/cpp/client/ydb_types/credentials ) YQL_LAST_ABI_VERSION() diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp index 7de4f0a04ea1..51c02bb40456 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp @@ -1,4 +1,5 @@ #include "yql_generic_read_actor.h" +#include "yql_generic_token_provider.h" #include #include @@ -12,10 +13,10 @@ #include #include #include -#include #include #include #include +#include namespace NYql::NDq { @@ -102,16 +103,16 @@ namespace NYql::NDq { ui64 inputIndex, TCollectStatsLevel statsLevel, NConnector::IClient::TPtr client, - const NConnector::NApi::TSelect& select, - const NConnector::NApi::TDataSourceInstance& dataSourceInstance, + TGenericTokenProvider::TPtr tokenProvider, + Generic::TSource&& source, const NActors::TActorId& computeActorId, const NKikimr::NMiniKQL::THolderFactory& holderFactory) : InputIndex_(inputIndex) , ComputeActorId_(computeActorId) , Client_(std::move(client)) + , TokenProvider_(std::move(tokenProvider)) , HolderFactory_(holderFactory) - , Select_(select) - , DataSourceInstance_(dataSourceInstance) + , Source_(source) { IngressStats_.Level = statsLevel; } @@ -143,7 +144,9 @@ namespace NYql::NDq { // Prepare request NConnector::NApi::TListSplitsRequest request; - *request.mutable_selects()->Add() = Select_; + NConnector::NApi::TSelect select = Source_.select(); // copy TSelect from source + TokenProvider_->MaybeFillToken(*select.mutable_data_source_instance()); + *request.mutable_selects()->Add() = std::move(select); // Initialize stream Client_->ListSplits(request).Subscribe( @@ -236,8 +239,11 @@ namespace NYql::NDq { std::for_each( Splits_.cbegin(), Splits_.cend(), - [&](const NConnector::NApi::TSplit& split) { request.mutable_splits()->Add()->CopyFrom(split); }); - request.mutable_data_source_instance()->CopyFrom(DataSourceInstance_); + [&](const NConnector::NApi::TSplit& split) { + NConnector::NApi::TSplit splitCopy = split; + TokenProvider_->MaybeFillToken(*splitCopy.mutable_select()->mutable_data_source_instance()); + *request.mutable_splits()->Add() = std::move(split); + }); // Start streaming Client_->ReadSplits(request).Subscribe( @@ -403,8 +409,8 @@ namespace NYql::NDq { // It's very important to fill UV columns in the alphabet order, // paying attention to the scalar field containing block length. TVector fieldNames; - std::transform(Select_.what().items().cbegin(), - Select_.what().items().cend(), + std::transform(Source_.select().what().items().cbegin(), + Source_.select().what().items().cend(), std::back_inserter(fieldNames), [](const auto& item) { return item.column().name(); }); @@ -484,6 +490,7 @@ namespace NYql::NDq { const NActors::TActorId ComputeActorId_; NConnector::IClient::TPtr Client_; + TGenericTokenProvider::TPtr TokenProvider_; NConnector::IListSplitsStreamIterator::TPtr ListSplitsIterator_; TVector Splits_; // accumulated list of table splits NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator_; @@ -492,22 +499,21 @@ namespace NYql::NDq { NKikimr::NMiniKQL::TPlainContainerCache ArrowRowContainerCache_; const NKikimr::NMiniKQL::THolderFactory& HolderFactory_; - const NYql::NConnector::NApi::TSelect Select_; - const NYql::NConnector::NApi::TDataSourceInstance DataSourceInstance_; + Generic::TSource Source_; }; std::pair CreateGenericReadActor(NConnector::IClient::TPtr genericClient, - Generic::TSource&& params, + Generic::TSource&& source, ui64 inputIndex, TCollectStatsLevel statsLevel, const THashMap& /*secureParams*/, const THashMap& /*taskParams*/, const NActors::TActorId& computeActorId, - ISecuredServiceAccountCredentialsFactory::TPtr /*credentialsFactory*/, + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const NKikimr::NMiniKQL::THolderFactory& holderFactory) { - const auto dsi = params.select().data_source_instance(); + const auto dsi = source.select().data_source_instance(); YQL_CLOG(INFO, ProviderGeneric) << "Creating read actor with params:" << " kind=" << NYql::NConnector::NApi::EDataSourceKind_Name(dsi.kind()) << ", endpoint=" << dsi.endpoint().ShortDebugString() @@ -526,6 +532,7 @@ namespace NYql::NDq { YQL_ENSURE(one != TString::npos && two != TString::npos && one < two, "Bad token format:" << token); */ + // Obtain token to access remote data source if necessary // TODO: partitioning is not implemented now, but this code will be useful for the further research: /* TStringBuilder part; @@ -539,12 +546,14 @@ namespace NYql::NDq { part << ';'; */ + auto tokenProvider = CreateGenericTokenProvider(source, credentialsFactory); + const auto actor = new TGenericReadActor( inputIndex, statsLevel, genericClient, - params.select(), - dsi, + std::move(tokenProvider), + std::move(source), computeActorId, holderFactory); diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp index b35709877445..3f0cce4a6dc9 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp @@ -16,7 +16,7 @@ namespace NYql::NDq { args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, args.HolderFactory); }; - for (auto& sourceName : {"ClickHouseGeneric", "PostgreSqlGeneric"}) { + for (auto& sourceName : {"ClickHouseGeneric", "PostgreSqlGeneric", "YdbGeneric"}) { factory.RegisterSource(sourceName, genericFactory); } } diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.cpp new file mode 100644 index 000000000000..e8430b87e9ec --- /dev/null +++ b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.cpp @@ -0,0 +1,67 @@ +#include "yql_generic_token_provider.h" + +#include + +namespace NYql::NDq { + TGenericTokenProvider::TGenericTokenProvider( + const NYql::Generic::TSource& source, const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) + : Source_(source) + , StaticIAMToken_(source.GetToken()) + , CredentialsProvider_(nullptr) + { + // 1. User has provided IAM-token itself. + // This token will be used during the whole lifetime of a read actor. + if (!StaticIAMToken_.empty()) { + return; + } + + // 2. User has provided service account creds. + // We create token accessor client that will renew token accessor by demand. + if (source.GetServiceAccountId() && source.GetServiceAccountIdSignature()) { + Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized"); + + auto structuredTokenJSON = + TStructuredTokenBuilder() + .SetServiceAccountIdAuth(source.GetServiceAccountId(), source.GetServiceAccountIdSignature()) + .ToJson(); + + // If service account is provided, obtain IAM-token + Y_ENSURE(structuredTokenJSON, "empty structured token"); + + auto credentialsProviderFactory = + CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, structuredTokenJSON, false); + CredentialsProvider_ = credentialsProviderFactory->CreateProvider(); + } + + // 3. If we reached this point, it means that user doesn't need token auth. + } + + void TGenericTokenProvider::MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const { + // 1. Don't need tokens if basic auth is set + if (dsi.credentials().has_basic()) { + return; + } + + *dsi.mutable_credentials()->mutable_token()->mutable_type() = "IAM"; + + // 2. If static IAM-token has been provided, use it + if (!StaticIAMToken_.empty()) { + *dsi.mutable_credentials()->mutable_token()->mutable_value() = StaticIAMToken_; + return; + } + + // 3. Otherwise use credentials provider to get token + Y_ENSURE(CredentialsProvider_, "CredentialsProvider is not initialized"); + + auto iamToken = CredentialsProvider_->GetAuthInfo(); + Y_ENSURE(iamToken, "CredentialsProvider returned empty IAM token"); + + *dsi.mutable_credentials()->mutable_token()->mutable_value() = std::move(iamToken); + } + + TGenericTokenProvider::TPtr + CreateGenericTokenProvider(const NYql::Generic::TSource& source, + const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) { + return std::make_unique(source, credentialsFactory); + } +} //namespace NYql::NDq diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h new file mode 100644 index 000000000000..495a44c15e57 --- /dev/null +++ b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h @@ -0,0 +1,30 @@ +#pragma once + +#include +#include +#include + +namespace NYql::NDq { + // When accessing external data sources using authentication via tokens, + // there are two options: + // 1. Use static IAM-token provided by user (especially useful during debugging); + // 2. Use service account credentials in order to get (and refresh) IAM-token by demand. + class TGenericTokenProvider { + public: + using TPtr = std::unique_ptr; + + TGenericTokenProvider(const NYql::Generic::TSource& source, + const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory); + + void MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const; + + private: + NYql::Generic::TSource Source_; + TString StaticIAMToken_; + NYdb::TCredentialsProviderPtr CredentialsProvider_; + }; + + TGenericTokenProvider::TPtr + CreateGenericTokenProvider(const NYql::Generic::TSource& source, + const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory); +} //namespace NYql::NDq diff --git a/ydb/library/yql/providers/generic/connector/api/common/data_source.proto b/ydb/library/yql/providers/generic/connector/api/common/data_source.proto index b29eb7e16a08..ecfb64665c9e 100644 --- a/ydb/library/yql/providers/generic/connector/api/common/data_source.proto +++ b/ydb/library/yql/providers/generic/connector/api/common/data_source.proto @@ -13,8 +13,14 @@ message TCredentials { string password = 2; } + message TToken { + string type = 1; + string value = 2; + } + oneof payload { TBasic basic = 1; + TToken token = 2; } } diff --git a/ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto b/ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto index 67cd9588547c..7004f2686136 100644 --- a/ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto +++ b/ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto @@ -197,8 +197,10 @@ message TSplit { // ReadDataRequest reads the data associated with a particular table split. message TReadSplitsRequest { - // Data source instance to connect - TDataSourceInstance data_source_instance = 1; + // Data source instance to connect. + // Deprecated field: server implementations must rely on + // TDataSourceInstance provided in each TSelect. + TDataSourceInstance data_source_instance = 1 [deprecated = true]; // Splits that YQ engine would like to read. repeated TSplit splits = 2; diff --git a/ydb/library/yql/providers/generic/connector/libcpp/client.cpp b/ydb/library/yql/providers/generic/connector/libcpp/client.cpp index 9d6237808377..8280a4e36886 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/client.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/client.cpp @@ -1,3 +1,5 @@ +#include + #include "client.h" namespace NYql::NConnector { @@ -21,10 +23,22 @@ namespace NYql::NConnector { public: TClientGRPC() = delete; TClientGRPC(const TGenericConnectorConfig& config) { - TString endpoint = TStringBuilder() << config.GetEndpoint().host() << ":" << ToString(config.GetEndpoint().port()); - GrpcConfig_ = NYdbGrpc::TGRpcClientConfig(endpoint); + GrpcConfig_ = NYdbGrpc::TGRpcClientConfig(); + + Y_ENSURE(config.GetEndpoint().host(), TStringBuilder() << "Empty host in TGenericConnectorConfig: " << config.DebugString()); + Y_ENSURE(config.GetEndpoint().port(), TStringBuilder() << "Empty port in TGenericConnectorConfig: " << config.DebugString()); + GrpcConfig_.Locator = TStringBuilder() << config.GetEndpoint().host() << ":" << config.GetEndpoint().port(); + GrpcConfig_.EnableSsl = config.GetUseSsl(); + // Read content of CA cert + TString rootCertData; + if (config.GetSslCaCrt()) { + rootCertData = TFileInput(config.GetSslCaCrt()).ReadAll(); + } + + GrpcConfig_.SslCredentials = grpc::SslCredentialsOptions{.pem_root_certs = rootCertData, .pem_private_key = "", .pem_cert_chain = ""}; + GrpcClient_ = std::make_unique(); // FIXME: is it OK to use single connection during the client lifetime? diff --git a/ydb/library/yql/providers/generic/connector/libcpp/client.h b/ydb/library/yql/providers/generic/connector/libcpp/client.h index 1d066e31b72a..7a2250798eb8 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/client.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/client.h @@ -17,8 +17,6 @@ namespace NYql::NConnector { using TAsyncResult = NThreading::TFuture>; using TDescribeTableAsyncResult = TAsyncResult; - // using TListSplitsAsyncResult = TAsyncResult; - // using TReadSplitsAsyncResult = TAsyncResult; template class TStreamer { diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp index 172194397add..f294f08ce4ef 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.cpp @@ -107,6 +107,42 @@ namespace NYql::NConnector::NTest { UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); } + void CreateYdbExternalDataSource( + const std::shared_ptr& kikimr, + const TString& dataSourceName, + const TString& login, + const TString& password, + const TString& endpoint, + bool useTls, + const TString& databaseName) + { + auto tc = kikimr->GetTableClient(); + auto session = tc.CreateSession().GetValueSync().GetSession(); + const TString query = fmt::format( + R"( + CREATE OBJECT {data_source_name}_password (TYPE SECRET) WITH (value={password}); + + CREATE EXTERNAL DATA SOURCE {data_source_name} WITH ( + SOURCE_TYPE="{source_type}", + LOCATION="{endpoint}", + AUTH_METHOD="BASIC", + LOGIN="{login}", + DATABASE_NAME="{database}", + PASSWORD_SECRET_NAME="{data_source_name}_password", + USE_TLS="{use_tls}" + ); + )", + "data_source_name"_a = dataSourceName, + "login"_a = login, + "password"_a = password, + "use_tls"_a = useTls ? "TRUE" : "FALSE", + "source_type"_a = ToString(NYql::EDatabaseType::Ydb), + "endpoint"_a = endpoint, + "database"_a = databaseName); + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + std::shared_ptr MakeEmptyRecordBatch(size_t rowsCount) { return arrow::RecordBatch::Make( std::make_shared(arrow::FieldVector()), diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h index f26a8e4728ad..6f09b9e5ffab 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h @@ -54,6 +54,11 @@ namespace NYql::NConnector::NTest { return TClickHouseDataSourceInstanceBuilder( \ this->Result_->mutable_data_source_instance(), \ static_cast(this)); \ + } \ + TYdbDataSourceInstanceBuilder YdbDataSourceInstance() { \ + return TYdbDataSourceInstanceBuilder( \ + this->Result_->mutable_data_source_instance(), \ + static_cast(this)); \ } MATCHER_P(ProtobufRequestMatcher, expected, "request does not match") { @@ -200,6 +205,15 @@ namespace NYql::NConnector::NTest { const TString& serviceAccountIdSignature = DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE, const TString& databaseName = DEFAULT_DATABASE); + void CreateYdbExternalDataSource( + const std::shared_ptr& kikimr, + const TString& dataSourceName = DEFAULT_DATA_SOURCE_NAME, + const TString& login = DEFAULT_LOGIN, + const TString& password = DEFAULT_PASSWORD, + const TString& endpoint = DEFAULT_YDB_ENDPOINT, + bool useTls = DEFAULT_USE_TLS, + const TString& databaseName = DEFAULT_DATABASE); + class TConnectorClientMock: public NYql::NConnector::IClient { public: MOCK_METHOD(TResult, DescribeTableImpl, (const NApi::TDescribeTableRequest& request)); @@ -277,6 +291,25 @@ namespace NYql::NConnector::NTest { } }; + template + struct TYdbDataSourceInstanceBuilder: public TBaseDataSourceInstanceBuilder, TParent> { + using TBase = TBaseDataSourceInstanceBuilder, TParent>; + + explicit TYdbDataSourceInstanceBuilder(NApi::TDataSourceInstance* result = nullptr, TParent* parent = nullptr) + : TBase(result, parent) + { + FillWithDefaults(); + } + + void FillWithDefaults() { + TBase::FillWithDefaults(); + this->Host(DEFAULT_YDB_HOST); + this->Port(DEFAULT_YDB_PORT); + this->Kind(NApi::EDataSourceKind::YDB); + this->Protocol(DEFAULT_YDB_PROTOCOL); + } + }; + template struct TDescribeTableResultBuilder: public TResponseBuilder { using TBuilder = TDescribeTableResultBuilder; diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp index b16aa541b43e..7eed47039379 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.cpp @@ -16,4 +16,7 @@ namespace NYql::NConnector::NTest { extern const TString DEFAULT_CH_CLUSTER_ID = "ch-managed"; extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID = "sa"; extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE = "sa_signature"; + + extern const TString DEFAULT_YDB_HOST = "localhost"; + extern const TString DEFAULT_YDB_ENDPOINT = TStringBuilder() << DEFAULT_YDB_HOST << ':' << DEFAULT_YDB_PORT; } // namespace NYql::NConnector::NTest diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h index f8316f7e9824..bbca9127a4bd 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/defaults.h @@ -25,4 +25,9 @@ namespace NYql::NConnector::NTest { constexpr NApi::EProtocol DEFAULT_CH_PROTOCOL = NApi::EProtocol::HTTP; extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID; extern const TString DEFAULT_CH_SERVICE_ACCOUNT_ID_SIGNATURE; + + extern const TString DEFAULT_YDB_HOST; + constexpr int DEFAULT_YDB_PORT = 2136; + extern const TString DEFAULT_YDB_ENDPOINT; + constexpr NApi::EProtocol DEFAULT_YDB_PROTOCOL = NApi::EProtocol::NATIVE; } // namespace NYql::NConnector::NTest diff --git a/ydb/library/yql/providers/generic/proto/source.proto b/ydb/library/yql/providers/generic/proto/source.proto index 0911dd54ef26..060593a99c15 100644 --- a/ydb/library/yql/providers/generic/proto/source.proto +++ b/ydb/library/yql/providers/generic/proto/source.proto @@ -8,12 +8,17 @@ import "ydb/library/yql/providers/generic/connector/api/service/protos/connector import "ydb/library/yql/providers/generic/connector/api/common/data_source.proto"; message TSource { - // Token to access database - // FIXME: unused field, delete it: - string token = 1; // Prepared Select expression NYql.NConnector.NApi.TSelect select = 2; - // Description of instance to connect - // FIXME: DataSourceInstance is already incapsulated into select, delete it: - NYql.NConnector.NApi.TDataSourceInstance data_source_instance = 3; -} \ No newline at end of file + + // Credentials used to access managed databases APIs. + // When working with external data source instances deployed in clouds, + // one should either set (ServiceAccountId, ServiceAccountIdSignature) pair + // that will be resolved into IAM Token via Token Accessor, + // or provide IAM Token directly. + string ServiceAccountId = 4; + string ServiceAccountIdSignature = 5; + string Token = 6; + + reserved 1, 3; +} diff --git a/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp b/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp index 10d74e5fc6cc..48bb17d52670 100644 --- a/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp +++ b/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp @@ -243,6 +243,7 @@ struct TPushdownFixture: public NUnitTest::TBaseFixture { TypesCtx.Get(), FunctionRegistry.Get(), DatabaseResolver, + nullptr, GenericClient, GatewaysCfg.GetGeneric()); diff --git a/ydb/library/yql/providers/generic/provider/ya.make b/ydb/library/yql/providers/generic/provider/ya.make index ca9d84e6365b..55dd70b153e2 100644 --- a/ydb/library/yql/providers/generic/provider/ya.make +++ b/ydb/library/yql/providers/generic/provider/ya.make @@ -21,6 +21,8 @@ SRCS( yql_generic_settings.cpp yql_generic_state.h yql_generic_state.cpp + yql_generic_utils.h + yql_generic_utils.cpp ) YQL_LAST_ABI_VERSION() @@ -45,13 +47,16 @@ PEERDIR( ydb/library/yql/providers/common/provider ydb/library/yql/providers/common/pushdown ydb/library/yql/providers/common/structured_token + ydb/library/yql/providers/common/token_accessor/client ydb/library/yql/providers/common/transform ydb/library/yql/providers/dq/common ydb/library/yql/providers/dq/expr_nodes ydb/library/yql/providers/generic/expr_nodes ydb/library/yql/providers/generic/proto + ydb/library/yql/providers/generic/connector/api/common ydb/library/yql/providers/generic/connector/libcpp ydb/library/yql/utils/plan + ydb/public/sdk/cpp/client/ydb_types/credentials ) END() diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp index 900c2e37f93a..886b493076ee 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_cluster_config.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -11,8 +12,6 @@ #include "yql_generic_cluster_config.h" namespace NYql { - using namespace NConnector; - using namespace NConnector::NApi; using namespace fmt::literals; void ParseLogin( @@ -20,7 +19,8 @@ namespace NYql { NYql::TGenericClusterConfig& clusterConfig) { auto it = properties.find("login"); if (it == properties.cend()) { - ythrow yexception() << "missing 'LOGIN' value"; + // It's OK not to have credentials for base auth + return; } if (!it->second) { @@ -35,7 +35,8 @@ namespace NYql { NYql::TGenericClusterConfig& clusterConfig) { auto it = properties.find("password"); if (it == properties.cend()) { - ythrow yexception() << "missing 'PASSWORD' value"; + // It's OK not to have credentials for base auth + return; } clusterConfig.MutableCredentials()->Mutablebasic()->Setpassword(it->second); @@ -157,6 +158,25 @@ namespace NYql { clusterConfig.SetDatabaseId(it->second); } + void ParseDatabaseId(const THashMap& properties, + NYql::TGenericClusterConfig& clusterConfig) { + auto it = properties.find("database_id"); + if (it == properties.cend()) { + return; + } + + if (!it->second) { + // DATABASE_ID is an optional field + return; + } + + if (!it->second) { + ythrow yexception() << "invalid 'DATABASE_ID' value: '" << it->second << "'"; + } + + clusterConfig.SetDatabaseId(it->second); + } + void ParseSourceType(const THashMap& properties, NYql::TGenericClusterConfig& clusterConfig) { auto it = properties.find("source_type"); @@ -172,6 +192,11 @@ namespace NYql { NYql::TGenericClusterConfig& clusterConfig) { using namespace NConnector::NApi; + if (clusterConfig.GetKind() == EDataSourceKind::YDB) { + clusterConfig.SetProtocol(EProtocol::NATIVE); + return; + } + auto it = properties.find("protocol"); if (it == properties.cend()) { ythrow yexception() << "missing 'PROTOCOL' value"; @@ -235,20 +260,6 @@ namespace NYql { } TGenericClusterConfig GenericClusterConfigFromProperties(const TString& clusterName, const THashMap& properties) { - // some cross-parameter validations - auto location = KeyIsSet(properties, "location"); - auto mdbClusterId = KeyIsSet(properties, "mdb_cluster_id"); - - if ((location && mdbClusterId) || (!location and !mdbClusterId)) { - ythrow yexception() << "you must provide either 'LOCATION' or 'MDB_CLUSTER_ID' parameter"; - } - - auto serviceAccountId = KeyIsSet(properties, "serviceAccountId"); - auto serviceAccountIdSignature = KeyIsSet(properties, "serviceAccountIdSignature"); - if ((serviceAccountId && !serviceAccountIdSignature) || (!serviceAccountId && serviceAccountIdSignature)) { - ythrow yexception() << "you must provide either both 'SERVICE_ACCOUNT_ID' and 'SERVICE_ACCOUNT_ID_SIGNATURE' parameters or none of them"; - } - NYql::TGenericClusterConfig clusterConfig; clusterConfig.set_name(clusterName); ParseLogin(properties, clusterConfig); @@ -258,6 +269,7 @@ namespace NYql { ParseDatabaseName(properties, clusterConfig); ParseSchema(properties, clusterConfig); ParseMdbClusterId(properties, clusterConfig); + ParseDatabaseId(properties, clusterConfig); ParseSourceType(properties, clusterConfig); ParseProtocol(properties, clusterConfig); ParseServiceAccountId(properties, clusterConfig); @@ -305,27 +317,16 @@ namespace NYql { "protocol"_a = NConnector::NApi::EProtocol_Name(clusterConfig.GetProtocol())); } + static const TSet managedDatabaseKinds{ + NConnector::NApi::EDataSourceKind::POSTGRESQL, + NConnector::NApi::EDataSourceKind::CLICKHOUSE, + NConnector::NApi::EDataSourceKind::YDB}; + void ValidateGenericClusterConfig( const NYql::TGenericClusterConfig& clusterConfig, const TString& context) { - // cross-parameter validations for optional fields - auto hasEndpoint = clusterConfig.HasEndpoint(); - auto databaseId = clusterConfig.GetDatabaseId(); - - if ((hasEndpoint && databaseId)) { - return ValidationError( - clusterConfig, - context, - "both 'Endpoint' and 'DatabaseId' fields are set; you must set only one of them"); - } - - if (!hasEndpoint and !databaseId) { - return ValidationError( - clusterConfig, - context, - "none of 'Endpoint' and 'DatabaseId' fields are set; you must set one of them"); - } - + // Service account ID and service account ID signature are tightly coupled: + // if one is set, another one must be set too. auto serviceAccountId = clusterConfig.GetServiceAccountId(); auto serviceAccountIdSignature = clusterConfig.GetServiceAccountIdSignature(); if (serviceAccountId && !serviceAccountIdSignature) { @@ -344,6 +345,8 @@ namespace NYql { "you must set either both 'ServiceAccountId' and 'ServiceAccountIdSignature' fields or none of them"); } + // Service account credentials and raw tokens are mutually exclusive: + // no need to specify service account parameters if one already has a token. auto token = clusterConfig.GetToken(); if ((serviceAccountId && serviceAccountIdSignature) && token) { return ValidationError( @@ -352,23 +355,60 @@ namespace NYql { "you must set either ('ServiceAccountId', 'ServiceAccountIdSignature') fields or 'Token' field or none of them"); } + // All managed databases: + // * set endpoint when working with on-prem instances + // * set database id when working with managed instances + if (managedDatabaseKinds.contains(clusterConfig.GetKind())) { + auto hasEndpoint = clusterConfig.HasEndpoint(); + auto hasDatabaseId = clusterConfig.HasDatabaseId(); + + if (hasEndpoint && hasDatabaseId) { + return ValidationError( + clusterConfig, + context, + "both 'Endpoint' and 'DatabaseId' fields are set; you must set only one of them"); + } + + if (!hasEndpoint and !hasDatabaseId) { + return ValidationError( + clusterConfig, + context, + "none of 'Endpoint' and 'DatabaseId' fields are set; you must set one of them"); + } + } + + // YDB: + // * set database name when working with on-prem YDB instance; + // * but set database ID when working with managed YDB. + if (clusterConfig.GetKind() == NConnector::NApi::YDB) { + if (clusterConfig.HasDatabaseName() && clusterConfig.HasDatabaseId()) { + return ValidationError( + clusterConfig, + context, + "For YDB clusters you must set either database name or database id, but you have set both of them"); + } + + if (!clusterConfig.HasDatabaseName() && !clusterConfig.HasDatabaseId()) { + return ValidationError( + clusterConfig, + context, + "For YDB clusters you must set either database name or database id, but you have set none of them"); + } + } + // check required fields if (!clusterConfig.GetName()) { return ValidationError(clusterConfig, context, "empty field 'Name'"); } - if (clusterConfig.GetKind() == EDataSourceKind::DATA_SOURCE_KIND_UNSPECIFIED) { + if (clusterConfig.GetKind() == NConnector::NApi::EDataSourceKind::DATA_SOURCE_KIND_UNSPECIFIED) { return ValidationError(clusterConfig, context, "empty field 'Kind'"); } - if (!clusterConfig.GetCredentials().Getbasic().Getusername()) { - return ValidationError(clusterConfig, context, "empty field 'Credentials.basic.username'"); - } - // TODO: validate Credentials.basic.password after ClickHouse recipe fix // TODO: validate DatabaseName field during https://st.yandex-team.ru/YQ-2494 - if (clusterConfig.GetProtocol() == EProtocol::PROTOCOL_UNSPECIFIED) { + if (clusterConfig.GetProtocol() == NConnector::NApi::EProtocol::PROTOCOL_UNSPECIFIED) { return ValidationError(clusterConfig, context, "empty field 'Protocol'"); } } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp index 73ceaa500c4d..74a6bd819177 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp @@ -99,11 +99,10 @@ namespace NYql { const auto settings = maybeSettings.Cast(); const auto& clusterName = source.DataSource().Cast().Cluster().StringValue(); const auto& table = settings.Table().StringValue(); - const auto& token = settings.Token().Name().StringValue(); - const auto& endpoint = State_->Configuration->ClusterNamesToClusterConfigs[clusterName].endpoint(); + const auto& clusterConfig = State_->Configuration->ClusterNamesToClusterConfigs[clusterName]; + const auto& endpoint = clusterConfig.endpoint(); - Generic::TSource srcDesc; - srcDesc.set_token(token); + Generic::TSource source; // for backward compability full path can be used (cluster_name.`db_name.table`) // TODO: simplify during https://st.yandex-team.ru/YQ-2494 @@ -126,7 +125,7 @@ namespace NYql { } // prepare select - auto select = srcDesc.mutable_select(); + auto select = source.mutable_select(); select->mutable_from()->set_table(TString(dbTable)); select->mutable_data_source_instance()->CopyFrom(tableMeta.value()->DataSourceInstance); @@ -149,19 +148,31 @@ namespace NYql { } } - // store data source instance - srcDesc.mutable_data_source_instance()->CopyFrom(tableMeta.value()->DataSourceInstance); + // Managed YDB supports access via IAM token. + // If exist, copy service account creds to obtain tokens during request execution phase. + // If exists, copy previously created token. + if (clusterConfig.kind() == NConnector::NApi::EDataSourceKind::YDB) { + source.SetServiceAccountId(clusterConfig.GetServiceAccountId()); + source.SetServiceAccountIdSignature(clusterConfig.GetServiceAccountIdSignature()); + source.SetToken(State_->Types->Credentials->FindCredentialContent( + "default_" + clusterConfig.name(), + "default_generic", + clusterConfig.GetToken())); + } // preserve source description for read actor - protoSettings.PackFrom(srcDesc); + protoSettings.PackFrom(source); - switch (srcDesc.data_source_instance().kind()) { + switch (select->data_source_instance().kind()) { case NYql::NConnector::NApi::CLICKHOUSE: sourceType = "ClickHouseGeneric"; break; case NYql::NConnector::NApi::POSTGRESQL: sourceType = "PostgreSqlGeneric"; break; + case NYql::NConnector::NApi::YDB: + sourceType = "YdbGeneric"; + break; default: ythrow yexception() << "Data source kind is unknown or not specified"; break; @@ -193,6 +204,9 @@ namespace NYql { case NConnector::NApi::POSTGRESQL: properties["SourceType"] = "PostgreSql"; break; + case NConnector::NApi::YDB: + properties["SourceType"] = "Ydb"; + break; case NConnector::NApi::DATA_SOURCE_KIND_UNSPECIFIED: break; default: diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp index 2089adbc798e..ae9e504f3c1b 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_io_discovery.cpp @@ -1,4 +1,6 @@ #include "yql_generic_provider_impl.h" +#include "yql_generic_utils.h" + #include #include #include @@ -91,6 +93,7 @@ namespace NYql { for (const auto& [databaseIdWithType, databaseDescription] : response.DatabaseDescriptionMap) { YQL_CLOG(INFO, ProviderGeneric) << "resolved database id into endpoint" << ": databaseId=" << databaseIdWithType.first + << ", databaseKind=" << databaseIdWithType.second << ", host=" << databaseDescription.Host << ", port=" << databaseDescription.Port; } @@ -157,7 +160,10 @@ namespace NYql { if (clusterConfigIter == clusterNamesToClusterConfigs.end()) { TIssues issues; - issues.AddIssue(TStringBuilder() << "no cluster names for database id " << databaseIdWithType.first << " and cluster name " << clusterName); + issues.AddIssue(TStringBuilder() << "no cluster names for database id " + << databaseIdWithType.first + << " and cluster name " + << clusterName); ctx.IssueManager.AddIssues(issues); return TStatus::Error; } @@ -165,6 +171,15 @@ namespace NYql { auto endpointDst = clusterConfigIter->second.mutable_endpoint(); endpointDst->set_host(databaseDescription.Host); endpointDst->set_port(databaseDescription.Port); + + // If we work with managed YDB, we find out database name + // only after database id (== cluster id) resolving. + if (clusterConfigIter->second.kind() == NConnector::NApi::EDataSourceKind::YDB) { + clusterConfigIter->second.set_databasename(databaseDescription.Database); + } + + YQL_CLOG(INFO, ProviderGeneric) << "ModifyClusterConfigs: " + << DumpGenericClusterConfig(clusterConfigIter->second); } } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp index 8befd4fd7cf4..383e342c1523 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp @@ -14,10 +14,11 @@ #include #include #include -#include -#include +#include #include #include +#include +#include namespace NYql { using namespace NNodes; @@ -32,7 +33,8 @@ namespace NYql { }; class TGenericLoadTableMetadataTransformer: public TGraphTransformerBase { - using TMapType = std::unordered_map>; + using TMapType = + std::unordered_map>; public: TGenericLoadTableMetadataTransformer(TGenericState::TPtr state) @@ -48,42 +50,37 @@ namespace NYql { } std::unordered_set pendingTables; - const auto& reads = FindNodes(input, - [&](const TExprNode::TPtr& node) { - if (const auto maybeRead = TMaybeNode(node)) { - return maybeRead.Cast().DataSource().Category().Value() == GenericProviderName; - } - return false; - }); + const auto& reads = FindNodes(input, [&](const TExprNode::TPtr& node) { + if (const auto maybeRead = TMaybeNode(node)) { + return maybeRead.Cast().DataSource().Category().Value() == GenericProviderName; + } + return false; + }); if (!reads.empty()) { for (const auto& r : reads) { const TGenRead read(r); if (!read.FreeArgs().Get(2).Ref().IsCallable("MrTableConcat")) { - ctx.AddError( - TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), TStringBuilder() << "Expected key")); + ctx.AddError(TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), "Expected key")); return TStatus::Error; } const auto maybeKey = TExprBase(read.FreeArgs().Get(2).Ref().HeadPtr()).Maybe(); if (!maybeKey) { - ctx.AddError( - TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), TStringBuilder() << "Expected key")); + ctx.AddError(TIssue(ctx.GetPosition(read.FreeArgs().Get(0).Pos()), "Expected key")); return TStatus::Error; } const auto& keyArg = maybeKey.Cast().Ref().Head(); if (!keyArg.IsList() || keyArg.ChildrenSize() != 2U || !keyArg.Head().IsAtom("table") || !keyArg.Tail().IsCallable(TCoString::CallableName())) { - ctx.AddError( - TIssue(ctx.GetPosition(keyArg.Pos()), TStringBuilder() << "Expected single table name")); + ctx.AddError(TIssue(ctx.GetPosition(keyArg.Pos()), "Expected single table name")); return TStatus::Error; } const auto clusterName = read.DataSource().Cluster().StringValue(); const auto tableName = TString(keyArg.Tail().Head().Content()); if (pendingTables.insert(TGenericState::TTableAddress(clusterName, tableName)).second) { - YQL_CLOG(INFO, ProviderGeneric) - << "Loading table meta for: `" << clusterName << "`.`" << tableName << "`"; + YQL_CLOG(INFO, ProviderGeneric) << "Loading table meta for: `" << clusterName << "`.`" << tableName << "`"; } } } @@ -108,6 +105,7 @@ namespace NYql { auto desc = emplaceIt.first->second; desc->DataSourceInstance = request.data_source_instance(); + Y_ENSURE(State_->GenericClient); State_->GenericClient->DescribeTable(request).Subscribe( [desc = std::move(desc), promise = std::move(promise)](const NConnector::TDescribeTableAsyncResult& f1) mutable { NConnector::TDescribeTableAsyncResult f2(f1); @@ -196,14 +194,13 @@ namespace NYql { } else { const auto& error = response.error(); NConnector::ErrorToExprCtx(error, ctx, ctx.GetPosition(read.Pos()), - TStringBuilder() - << "Loading metadata for table: " << clusterName << '.' << tableName); + TStringBuilder() << "Loading metadata for table: " << clusterName << '.' << tableName); hasErrors = true; break; } } else { - ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), - TStringBuilder() << "Not found result for " << clusterName << '.' << tableName)); + ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), TStringBuilder() + << "Not found result for " << clusterName << '.' << tableName)); hasErrors = true; break; } @@ -222,10 +219,8 @@ namespace NYql { } private: - const TStructExprType* ParseTableMeta(const NConnector::NApi::TSchema& schema, - const std::string_view& cluster, - const std::string_view& table, TExprContext& ctx, - TVector& columnOrder) try { + const TStructExprType* ParseTableMeta(const NConnector::NApi::TSchema& schema, const std::string_view& cluster, + const std::string_view& table, TExprContext& ctx, TVector& columnOrder) try { TVector items; auto columns = schema.columns(); @@ -250,25 +245,81 @@ namespace NYql { return nullptr; } - void FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig, const TString& tablePath) { + void FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig, + const TString& tablePath) { const auto dataSourceKind = clusterConfig.GetKind(); auto dsi = request.mutable_data_source_instance(); - *dsi->mutable_endpoint() = clusterConfig.GetEndpoint(); dsi->set_kind(dataSourceKind); - *dsi->mutable_credentials() = clusterConfig.GetCredentials(); dsi->set_use_tls(clusterConfig.GetUseSsl()); dsi->set_protocol(clusterConfig.GetProtocol()); + FillCredentials(request, clusterConfig); FillTypeMappingSettings(request); FillDataSourceOptions(request, clusterConfig); FillTablePath(request, clusterConfig, tablePath); } + void FillCredentials(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig) { + auto dsi = request.mutable_data_source_instance(); + + // If login/password is provided, just copy them into request: + // connector will use Basic Auth to access external data sources. + if (clusterConfig.GetCredentials().Hasbasic()) { + *dsi->mutable_credentials() = clusterConfig.GetCredentials(); + return; + } + + // If there are no Basic Auth parameters, two options can be considered: + + // 1. Client provided own IAM-token to access external data source + auto iamToken = State_->Types->Credentials->FindCredentialContent( + "default_" + clusterConfig.name(), + "default_generic", + clusterConfig.GetToken()); + if (iamToken) { + *dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken; + *dsi->mutable_credentials()->mutable_token()->mutable_type() = "IAM"; + return; + } + + // 2. Client provided service account creds that must be converted into IAM-token + Y_ENSURE(State_->CredentialsFactory, "CredentialsFactory is not initialized"); + + auto structuredTokenJSON = TStructuredTokenBuilder().SetServiceAccountIdAuth( + clusterConfig.GetServiceAccountId(), + clusterConfig.GetServiceAccountIdSignature()) + .ToJson(); + + Y_ENSURE(structuredTokenJSON, "empty structured token"); + + // Create provider or get existing one. + // It's crucial to reuse providers because their construction implies synchronous IO. + auto providersIt = State_->CredentialProviders.find(clusterConfig.name()); + if (providersIt == State_->CredentialProviders.end()) { + auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken( + State_->CredentialsFactory, + structuredTokenJSON, + false); + + providersIt = State_->CredentialProviders.emplace( + std::make_pair(clusterConfig.name(), credentialsProviderFactory->CreateProvider())) + .first; + } + + iamToken = providersIt->second->GetAuthInfo(); + Y_ENSURE(iamToken, "empty IAM token"); + + *dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken; + *dsi->mutable_credentials()->mutable_token()->mutable_type() = "IAM"; + } + void FillDataSourceOptions(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig) { const auto dataSourceKind = clusterConfig.GetKind(); switch (dataSourceKind) { case NYql::NConnector::NApi::CLICKHOUSE: break; + case NYql::NConnector::NApi::YDB: + break; case NYql::NConnector::NApi::POSTGRESQL: { // for backward compability set schema "public" by default // TODO: simplify during https://st.yandex-team.ru/YQ-2494 @@ -285,13 +336,14 @@ namespace NYql { } break; default: - ythrow yexception() << "Unexpected data source kind: '" - << NYql::NConnector::NApi::EDataSourceKind_Name(dataSourceKind) << "'"; + ythrow yexception() << "Unexpected data source kind: '" << NYql::NConnector::NApi::EDataSourceKind_Name(dataSourceKind) + << "'"; } } void FillTypeMappingSettings(NConnector::NApi::TDescribeTableRequest& request) { - const TString dateTimeFormat = State_->Configuration->DateTimeFormat.Get().GetOrElse(TGenericSettings::TDefault::DateTimeFormat); + const TString dateTimeFormat = + State_->Configuration->DateTimeFormat.Get().GetOrElse(TGenericSettings::TDefault::DateTimeFormat); if (dateTimeFormat == "string") { request.mutable_type_mapping_settings()->set_date_time_format(NConnector::NApi::STRING_FORMAT); } else if (dateTimeFormat == "YQL") { @@ -301,7 +353,8 @@ namespace NYql { } } - void FillTablePath(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig, const TString& tablePath) { + void FillTablePath(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig, + const TString& tablePath) { // for backward compability full path can be used (cluster_name.`db_name.table`) // TODO: simplify during https://st.yandex-team.ru/YQ-2494 const auto dataSourceKind = clusterConfig.GetKind(); @@ -324,7 +377,7 @@ namespace NYql { dbNameTarget = "postgres"; break; default: - ythrow yexception() << "Unexpected data source kind: '" + ythrow yexception() << "You must provide database name explicitly for data source kind: '" << NYql::NConnector::NApi::EDataSourceKind_Name(dataSourceKind) << "'"; } } // else take database name from table path diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp index d2178ccd5b0e..c720e1b64d0e 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider.cpp @@ -6,13 +6,14 @@ namespace NYql { TDataProviderInitializer GetGenericDataProviderInitializer(NConnector::IClient::TPtr genericClient, - const std::shared_ptr dbResolver) + const IDatabaseAsyncResolver::TPtr& dbResolver, + const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) { - return [genericClient, dbResolver](const TString& userName, const TString& sessionId, const TGatewaysConfig* gatewaysConfig, - const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - TIntrusivePtr randomProvider, TIntrusivePtr typeCtx, - const TOperationProgressWriter& progressWriter, const TYqlOperationOptions& operationOptions, - THiddenQueryAborter) + return [genericClient, dbResolver, credentialsFactory](const TString& userName, const TString& sessionId, const TGatewaysConfig* gatewaysConfig, + const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, + TIntrusivePtr randomProvider, TIntrusivePtr typeCtx, + const TOperationProgressWriter& progressWriter, const TYqlOperationOptions& operationOptions, + THiddenQueryAborter) { Y_UNUSED(sessionId); Y_UNUSED(userName); @@ -25,6 +26,7 @@ namespace NYql { typeCtx.Get(), functionRegistry, dbResolver, + credentialsFactory, genericClient, gatewaysConfig->GetGeneric()); diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_provider.h b/ydb/library/yql/providers/generic/provider/yql_generic_provider.h index d990b2084bb4..5c8e4c967a8c 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_provider.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_provider.h @@ -2,14 +2,14 @@ #include "yql_generic_state.h" -#include #include #include namespace NYql { TDataProviderInitializer GetGenericDataProviderInitializer( - NConnector::IClient::TPtr genericClient, // required - std::shared_ptr dbResolver = nullptr // can be missing in on-prem installations + NConnector::IClient::TPtr genericClient, // required + const IDatabaseAsyncResolver::TPtr& dbResolver = nullptr, // can be missing in on-prem installations + const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory = nullptr // can be missing in on-prem installations ); TIntrusivePtr CreateGenericDataSource(TGenericState::TPtr state); diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp index 0b4c93a8bf4d..1c2521573ddb 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.cpp @@ -1,5 +1,6 @@ #include "yql_generic_cluster_config.h" #include "yql_generic_settings.h" +#include "yql_generic_utils.h" #include #include @@ -34,7 +35,7 @@ namespace NYql { const TCredentials::TPtr& credentials) { ValidateGenericClusterConfig(clusterConfig, "TGenericConfiguration::AddCluster"); - YQL_CLOG(INFO, ProviderGeneric) << "generic provider add cluster: " << DumpGenericClusterConfig(clusterConfig); + YQL_CLOG(INFO, ProviderGeneric) << "GenericConfiguration::AddCluster: " << DumpGenericClusterConfig(clusterConfig); const auto& clusterName = clusterConfig.GetName(); const auto& databaseId = clusterConfig.GetDatabaseId(); @@ -95,23 +96,6 @@ namespace NYql { "or set (ServiceAccountId && ServiceAccountIdSignature) in cluster config"; } - TString TGenericConfiguration::DumpGenericClusterConfig(const TGenericClusterConfig& clusterConfig) const { - TStringBuilder sb; - sb << "name = " << clusterConfig.GetName() - << ", kind = " << NConnector::NApi::EDataSourceKind_Name(clusterConfig.GetKind()) - << ", database name = " << clusterConfig.GetDatabaseName() - << ", database id = " << clusterConfig.GetName() - << ", endpoint = " << clusterConfig.GetEndpoint() - << ", use tls = " << clusterConfig.GetUseSsl() - << ", protocol = " << NConnector::NApi::EProtocol_Name(clusterConfig.GetProtocol()); - - for (const auto& [key, value] : clusterConfig.GetDataSourceOptions()) { - sb << ", " << key << " = " << value; - } - - return sb; - } - TGenericSettings::TConstPtr TGenericConfiguration::Snapshot() const { return std::make_shared(*this); } diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h index d783963a6589..07a19c5ce827 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h @@ -25,26 +25,21 @@ namespace NYql { TGenericConfiguration(); TGenericConfiguration(const TGenericConfiguration&) = delete; - void Init(const NYql::TGenericGatewayConfig& gatewayConfig, - const std::shared_ptr databaseResolver, - NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth, - const TCredentials::TPtr& credentials); + void Init(const NYql::TGenericGatewayConfig& gatewayConfig, const std::shared_ptr databaseResolver, + NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth, const TCredentials::TPtr& credentials); - void AddCluster(const TGenericClusterConfig& clusterConfig, - const std::shared_ptr databaseResolver, - NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth, - const TCredentials::TPtr& credentials); + void AddCluster(const TGenericClusterConfig& clusterConfig, const std::shared_ptr databaseResolver, + NYql::IDatabaseAsyncResolver::TDatabaseAuthMap& databaseAuth, const TCredentials::TPtr& credentials); TGenericSettings::TConstPtr Snapshot() const; bool HasCluster(TStringBuf cluster) const; private: TString MakeStructuredToken(const TGenericClusterConfig& clusterConfig, const TCredentials::TPtr& credentials) const; - TString DumpGenericClusterConfig(const TGenericClusterConfig& clusterConfig) const; public: THashMap Tokens; THashMap ClusterNamesToClusterConfigs; // cluster name -> cluster config THashMap> DatabaseIdsToClusterNames; // database id -> cluster name }; -} +} //namespace NYql diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_state.h b/ydb/library/yql/providers/generic/provider/yql_generic_state.h index e2362bc5ad27..3d69efdfe0d3 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_state.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_state.h @@ -3,7 +3,9 @@ #include "yql_generic_settings.h" #include +#include #include +#include namespace NKikimr::NMiniKQL { class IFunctionRegistry; @@ -29,13 +31,15 @@ namespace NYql { TGenericState( TTypeAnnotationContext* types, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - const std::shared_ptr& databaseResolver, + const std::shared_ptr& databaseResolver, + const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory, const NConnector::IClient::TPtr& genericClient, const TGenericGatewayConfig& gatewayConfig) : Types(types) , Configuration(MakeIntrusive()) , FunctionRegistry(functionRegistry) , DatabaseResolver(databaseResolver) + , CredentialsFactory(credentialsFactory) , GenericClient(genericClient) { Configuration->Init(gatewayConfig, databaseResolver, DatabaseAuth, types->Credentials); @@ -49,9 +53,15 @@ namespace NYql { TGenericConfiguration::TPtr Configuration = MakeIntrusive(); const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry; - // key - (database id, database type), value - credentials to access MDB API - NYql::IDatabaseAsyncResolver::TDatabaseAuthMap DatabaseAuth; - std::shared_ptr DatabaseResolver; + // key - (database id, database type), value - credentials to access managed APIs + IDatabaseAsyncResolver::TDatabaseAuthMap DatabaseAuth; + std::shared_ptr DatabaseResolver; + + // key - cluster name, value - TCredentialsProviderPtr + // It's important to cache credentials providers, because they make IO + // (synchronous call via Token Accessor client) during the construction. + std::unordered_map CredentialProviders; + ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; NConnector::IClient::TPtr GenericClient; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_utils.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_utils.cpp new file mode 100644 index 000000000000..aba0b51924b3 --- /dev/null +++ b/ydb/library/yql/providers/generic/provider/yql_generic_utils.cpp @@ -0,0 +1,22 @@ +#include "yql_generic_utils.h" + +#include + +namespace NYql { + TString DumpGenericClusterConfig(const TGenericClusterConfig& clusterConfig) { + TStringBuilder sb; + sb << "name = " << clusterConfig.GetName() + << ", kind = " << NConnector::NApi::EDataSourceKind_Name(clusterConfig.GetKind()) + << ", database name = " << clusterConfig.GetDatabaseName() + << ", database id = " << clusterConfig.GetDatabaseId() + << ", endpoint = " << clusterConfig.GetEndpoint() + << ", use tls = " << clusterConfig.GetUseSsl() + << ", protocol = " << NConnector::NApi::EProtocol_Name(clusterConfig.GetProtocol()); + + for (const auto& [key, value] : clusterConfig.GetDataSourceOptions()) { + sb << ", " << key << " = " << value; + } + + return sb; + } +} diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_utils.h b/ydb/library/yql/providers/generic/provider/yql_generic_utils.h new file mode 100644 index 000000000000..49c6bab7abca --- /dev/null +++ b/ydb/library/yql/providers/generic/provider/yql_generic_utils.h @@ -0,0 +1,8 @@ +#pragma once + +#include +#include + +namespace NYql { + TString DumpGenericClusterConfig(const TGenericClusterConfig& clusterConfig); +} diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp index 7d71897cc07e..1ea9cb995e76 100644 --- a/ydb/library/yql/tools/dqrun/dqrun.cpp +++ b/ydb/library/yql/tools/dqrun/dqrun.cpp @@ -46,6 +46,7 @@ #include #include #include +#include #include #include #include @@ -92,6 +93,7 @@ #include #include #include +#include #include #ifdef PROFILE_MEMORY_ALLOCATIONS @@ -223,14 +225,20 @@ class TOptPipelineConfigurator : public IPipelineConfigurator { IOutputStream* TracePlan; }; -NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory(const NYdb::TDriver& driver, IHTTPGateway::TPtr httpGateway, NYql::NConnector::IClient::TPtr genericClient, size_t HTTPmaxTimeSeconds, size_t maxRetriesCount) { +NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory( + const NYdb::TDriver& driver, + IHTTPGateway::TPtr httpGateway, + NYql::NConnector::IClient::TPtr genericClient, + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + size_t HTTPmaxTimeSeconds, + size_t maxRetriesCount) { auto factory = MakeIntrusive(); RegisterDqPqReadActorFactory(*factory, driver, nullptr); RegisterYdbReadActorFactory(*factory, driver, nullptr); RegisterS3ReadActorFactory(*factory, nullptr, httpGateway, GetHTTPDefaultRetryPolicy(TDuration::Seconds(HTTPmaxTimeSeconds), maxRetriesCount), {}, nullptr); RegisterS3WriteActorFactory(*factory, nullptr, httpGateway); RegisterClickHouseReadActorFactory(*factory, nullptr, httpGateway); - RegisterGenericReadActorFactory(*factory, nullptr, genericClient); + RegisterGenericReadActorFactory(*factory, credentialsFactory, genericClient); RegisterDqPqWriteActorFactory(*factory, driver, nullptr); @@ -409,6 +417,7 @@ int RunMain(int argc, const char* argv[]) TString mountConfig; TString mestricsPusherConfig; TString udfResolver; + TString tokenAccessorEndpoint; bool udfResolverFilterSyscalls = false; TString statFile; TString metricsFile; @@ -559,6 +568,10 @@ int RunMain(int argc, const char* argv[]) failureInjections[key] = std::make_pair(ui32(0), FromString(fail)); } }); + opts.AddLongOption("token-accessor-endpoint", "Network address of Token Accessor service in format grpc(s)://host:port") + .Optional() + .RequiredArgument("ENDPOINT") + .StoreResult(&tokenAccessorEndpoint); opts.AddHelpOption('h'); opts.SetFreeArgsNum(0); @@ -743,6 +756,16 @@ int RunMain(int argc, const char* argv[]) ); } + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory; + + if (tokenAccessorEndpoint) { + TVector ss = StringSplitter(tokenAccessorEndpoint).SplitByString("://"); + YQL_ENSURE(ss.size() == 2, "Invalid tokenAccessorEndpoint: " << tokenAccessorEndpoint); + + credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(ss[1], ss[0] == "grpcs", ""); + } + + NConnector::IClient::TPtr genericClient; if (gatewaysConfig.HasGeneric()) { for (auto& cluster : *gatewaysConfig.MutableGeneric()->MutableClusterMapping()) { @@ -750,7 +773,8 @@ int RunMain(int argc, const char* argv[]) } genericClient = NConnector::MakeClientGRPC(gatewaysConfig.GetGeneric().GetConnector()); - dataProvidersInit.push_back(GetGenericDataProviderInitializer(genericClient, dbResolver)); + + dataProvidersInit.push_back(GetGenericDataProviderInitializer(genericClient, dbResolver, credentialsFactory)); } if (gatewaysConfig.HasYdb()) { @@ -818,11 +842,10 @@ int RunMain(int argc, const char* argv[]) size_t requestTimeout = gatewaysConfig.HasHttpGateway() && gatewaysConfig.GetHttpGateway().HasRequestTimeoutSeconds() ? gatewaysConfig.GetHttpGateway().GetRequestTimeoutSeconds() : 100; size_t maxRetries = gatewaysConfig.HasHttpGateway() && gatewaysConfig.GetHttpGateway().HasMaxRetries() ? gatewaysConfig.GetHttpGateway().GetMaxRetries() : 2; - const bool enableSpilling = res.Has("enable-spilling"); - dqGateway = CreateLocalDqGateway(funcRegistry.Get(), dqCompFactory, dqTaskTransformFactory, dqTaskPreprocessorFactories, - enableSpilling, CreateAsyncIoFactory(driver, httpGateway, genericClient, requestTimeout, maxRetries), threads, - metricsRegistry, - metricsPusherFactory); + bool enableSpilling = res.Has("enable-spilling"); + dqGateway = CreateLocalDqGateway(funcRegistry.Get(), dqCompFactory, dqTaskTransformFactory, dqTaskPreprocessorFactories, enableSpilling, + CreateAsyncIoFactory(driver, httpGateway, genericClient, credentialsFactory, requestTimeout, maxRetries), threads, + metricsRegistry, metricsPusherFactory); } gateways.emplace_back(dqGateway); diff --git a/ydb/library/yql/tools/dqrun/ya.make b/ydb/library/yql/tools/dqrun/ya.make index 5643a263eacf..c47f3877d79a 100644 --- a/ydb/library/yql/tools/dqrun/ya.make +++ b/ydb/library/yql/tools/dqrun/ya.make @@ -42,6 +42,7 @@ ENDIF() ydb/library/yql/providers/clickhouse/provider ydb/library/yql/providers/common/comp_nodes ydb/library/yql/providers/common/proto + ydb/library/yql/providers/common/token_accessor/client ydb/library/yql/providers/common/udf_resolve ydb/library/yql/providers/generic/actors ydb/library/yql/providers/generic/provider diff --git a/ydb/services/fq/ut_integration/fq_ut.cpp b/ydb/services/fq/ut_integration/fq_ut.cpp index 1b5764fee4cf..c011c80138d7 100644 --- a/ydb/services/fq/ut_integration/fq_ut.cpp +++ b/ydb/services/fq/ut_integration/fq_ut.cpp @@ -197,32 +197,6 @@ Y_UNIT_TEST_SUITE(Yq_1) { } } - Y_UNIT_TEST(Basic_EmptyTable) { - TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); - UpsertToExistingTable(driver, location); - NYdb::NFq::TClient client(driver); - const TString folderId = "some_folder_id"; - { - const auto request = ::NFq::TCreateConnectionBuilder() - .SetName("testdbempty") - .CreateYdb("Root", location, "") - .Build(); - const auto result = client - .CreateConnection(request, CreateFqSettings(folderId)) - .ExtractValueSync(); - UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); - } - - const TString queryId = CreateNewHistoryAndWaitFinish( - folderId, client, - "select count(*) from testdbempty.`yq/empty_table`", - FederatedQuery::QueryMeta::COMPLETED); - CheckGetResultData(client, queryId, folderId, 1, 1, 0); - } - Y_UNIT_TEST(Basic_EmptyList) { TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); ui16 grpc = server.GetPort(); @@ -256,32 +230,6 @@ Y_UNIT_TEST_SUITE(Yq_1) { CreateNewHistoryAndWaitFinish(folderId, client, "select null", expectedStatus); } - SIMPLE_UNIT_FORKED_TEST(Basic_Tagged) { - TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); - NYdb::NFq::TClient client(driver); - const TString folderId = "some_folder_id"; - - - { - auto request = ::NFq::TCreateConnectionBuilder{} - .SetName("testdb00") - .CreateYdb("Root", location, "") - .Build(); - - auto result = client.CreateConnection( - request, CreateFqSettings(folderId)) - .ExtractValueSync(); - - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - } - - auto expectedStatus = FederatedQuery::QueryMeta::COMPLETED; - CreateNewHistoryAndWaitFinish(folderId, client, "select AsTagged(count(*), \"tag\") from testdb00.`yq/connections`", expectedStatus); - } - Y_UNIT_TEST(Basic_TaggedLiteral) { TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); ui16 grpc = server.GetPort(); @@ -295,50 +243,6 @@ Y_UNIT_TEST_SUITE(Yq_1) { } // use fork for data test due to ch initialization problem - SIMPLE_UNIT_FORKED_TEST(ExtendedDatabaseId) { - TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); - - NYdb::NFq::TClient client(driver); - const TString folderId = "folder_id_" + CreateGuidAsString(); - { - const auto request = ::NFq::TCreateConnectionBuilder() - .SetName("testdb01") - .CreateYdb("FakeDatabaseId", "") - .Build(); - const auto result = client - .CreateConnection(request, CreateFqSettings(folderId)) - .ExtractValueSync(); - UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); - } - - { - const auto request = ::NFq::TCreateConnectionBuilder() - .SetName("testdb02") - .CreateYdb("FakeDatabaseId", "") - .Build(); - const auto result = client - .CreateConnection(request, CreateFqSettings(folderId)) - .ExtractValueSync(); - UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); - } - - { - const auto queryId = CreateNewHistoryAndWaitFinish(folderId, client, - "select count(*) from testdb01.`yq/connections`", FederatedQuery::QueryMeta::COMPLETED); - CheckGetResultData(client, queryId, folderId, 1, 1, 2); - } - - { - // test connections db with 2 databaseId - const auto queryId = CreateNewHistoryAndWaitFinish(folderId, client, - "select count(*) from testdb02.`yq/connections`", FederatedQuery::QueryMeta::COMPLETED); - CheckGetResultData(client, queryId, folderId, 1, 1, 2); - } - } - Y_UNIT_TEST(DescribeConnection) { TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); ui16 grpc = server.GetPort(); @@ -855,70 +759,6 @@ Y_UNIT_TEST_SUITE(Yq_1) { } } -Y_UNIT_TEST_SUITE(Yq_2) { - SIMPLE_UNIT_FORKED_TEST(ReadFromYdbOverYq) { - TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); - ui16 grpc = server.GetPort(); - TString location = TStringBuilder() << "localhost:" << grpc; - auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); - NYdb::NFq::TClient client(driver); - const auto folderId = TString(__func__) + "folder_id"; - - { - auto request = ::NFq::TCreateConnectionBuilder{} - .SetName("testdb00") - .CreateYdb("Root", location, "") - .Build(); - - auto result = client.CreateConnection( - request, CreateFqSettings(folderId)) - .ExtractValueSync(); - - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - } - - TString queryId; - { - auto request = ::NFq::TCreateQueryBuilder{} - .SetText("select count(*) from testdb00.`yq/connections`") - .Build(); - auto result = client.CreateQuery( - request, CreateFqSettings(folderId)) - .ExtractValueSync(); - - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - queryId = result.GetResult().query_id(); - } - - { - auto request = ::NFq::TDescribeQueryBuilder{}.SetQueryId(queryId).Build(); - auto result = DoWithRetryOnRetCode([&]() { - auto result = client.DescribeQuery( - request, CreateFqSettings(folderId)) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - const auto status = result.GetResult().query().meta().status(); - PrintProtoIssues(result.GetResult().query().issue()); - return status == FederatedQuery::QueryMeta::COMPLETED; - }, TRetryOptions(10)); - UNIT_ASSERT_C(result, "the execution of the query did not end within the time limit"); - } - - { - auto request = ::NFq::TGetResultDataBuilder{}.SetQueryId(queryId).Build(); - auto result = client.GetResultData( - request, CreateFqSettings(folderId)) - .ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - - const auto& resultSet = result.GetResult().result_set(); - UNIT_ASSERT_VALUES_EQUAL(resultSet.rows().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(resultSet.columns().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(resultSet.rows(0).items(0).uint64_value(), 1); - } - } -} - Y_UNIT_TEST_SUITE(PrivateApi) { Y_UNIT_TEST(PingTask) { TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);