Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions ydb/library/yql/providers/common/proto/gateways_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -563,12 +563,11 @@ message TGenericClusterConfig {
// Credentials used to access data source instance
optional NYql.NConnector.NApi.TCredentials Credentials = 10;

// Credentials used to access MDB API.
// When working with data source instances deployed in a cloud,
// you should either set (ServiceAccountId, ServiceAccountIdSignature) pair,
// or set IAM Token.
// The names of these fields must satisfy this template function:
// https://github.com/ydb-platform/ydb/arcadia/contrib/ydb/core/fq/libs/actors/clusters_from_connections.cpp?rev=r11823087#L19
// Credentials used to access managed databases APIs.
// When working with external data source instances deployed in clouds,
// one should either set (ServiceAccountId, ServiceAccountIdSignature) pair
// that will be resolved into IAM Token via Token Accessor,
// or provide IAM Token directly.
optional string ServiceAccountId = 6;
optional string ServiceAccountIdSignature = 7;
optional string Token = 11;
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/generic/actors/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ LIBRARY()
SRCS(
yql_generic_read_actor.cpp
yql_generic_source_factory.cpp
yql_generic_token_provider.cpp
)

PEERDIR(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "yql_generic_read_actor.h"
#include "yql_generic_token_provider.h"

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/actorsystem.h>
Expand All @@ -9,11 +10,9 @@
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/error.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/utils.h>
#include <ydb/library/yql/providers/generic/proto/range.pb.h>
#include <ydb/library/yql/public/udf/arrow/util.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/utils/yql_panic.h>
Expand Down Expand Up @@ -104,14 +103,14 @@ namespace NYql::NDq {
ui64 inputIndex,
TCollectStatsLevel statsLevel,
NConnector::IClient::TPtr client,
NYdb::TCredentialsProviderPtr credentialsProvider,
NConnector::TSource&& source,
TGenericTokenProvider::TPtr tokenProvider,
Generic::TSource&& source,
const NActors::TActorId& computeActorId,
const NKikimr::NMiniKQL::THolderFactory& holderFactory)
: InputIndex_(inputIndex)
, ComputeActorId_(computeActorId)
, Client_(std::move(client))
, CredentialsProvider_(std::move(credentialsProvider))
, TokenProvider_(std::move(tokenProvider))
, HolderFactory_(holderFactory)
, Source_(source)
{
Expand Down Expand Up @@ -146,7 +145,7 @@ namespace NYql::NDq {
// Prepare request
NConnector::NApi::TListSplitsRequest request;
NConnector::NApi::TSelect select = Source_.select(); // copy TSelect from source
MaybeRefreshToken(select.mutable_data_source_instance());
TokenProvider_->MaybeFillToken(*select.mutable_data_source_instance());
*request.mutable_selects()->Add() = std::move(select);

// Initialize stream
Expand Down Expand Up @@ -242,7 +241,7 @@ namespace NYql::NDq {
Splits_.cbegin(), Splits_.cend(),
[&](const NConnector::NApi::TSplit& split) {
NConnector::NApi::TSplit splitCopy = split;
MaybeRefreshToken(splitCopy.mutable_select()->mutable_data_source_instance());
TokenProvider_->MaybeFillToken(*splitCopy.mutable_select()->mutable_data_source_instance());
*request.mutable_splits()->Add() = std::move(split);
});

Expand Down Expand Up @@ -459,20 +458,6 @@ namespace NYql::NDq {
return total;
}

void MaybeRefreshToken(NConnector::NApi::TDataSourceInstance* dsi) const {
if (!dsi->credentials().has_token()) {
return;
}

// Token may have expired. Refresh it.
Y_ENSURE(CredentialsProvider_, "CredentialsProvider is not initialized");
auto iamToken = CredentialsProvider_->GetAuthInfo();
Y_ENSURE(iamToken, "empty IAM token");

*dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken;
*dsi->mutable_credentials()->mutable_token()->mutable_type() = "IAM";
}

// IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
YQL_CLOG(INFO, ProviderGeneric) << "PassAway :: final ingress stats"
Expand Down Expand Up @@ -505,7 +490,7 @@ namespace NYql::NDq {
const NActors::TActorId ComputeActorId_;

NConnector::IClient::TPtr Client_;
NYdb::TCredentialsProviderPtr CredentialsProvider_;
TGenericTokenProvider::TPtr TokenProvider_;
NConnector::IListSplitsStreamIterator::TPtr ListSplitsIterator_;
TVector<NConnector::NApi::TSplit> Splits_; // accumulated list of table splits
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator_;
Expand All @@ -514,12 +499,12 @@ namespace NYql::NDq {

NKikimr::NMiniKQL::TPlainContainerCache ArrowRowContainerCache_;
const NKikimr::NMiniKQL::THolderFactory& HolderFactory_;
NConnector::TSource Source_;
Generic::TSource Source_;
};

std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*>
CreateGenericReadActor(NConnector::IClient::TPtr genericClient,
NConnector::TSource&& source,
Generic::TSource&& source,
ui64 inputIndex,
TCollectStatsLevel statsLevel,
const THashMap<TString, TString>& /*secureParams*/,
Expand Down Expand Up @@ -548,24 +533,6 @@ namespace NYql::NDq {
*/

// Obtain token to access remote data source if necessary
NYdb::TCredentialsProviderPtr credentialProvider;
if (source.GetServiceAccountId() && source.GetServiceAccountIdSignature()) {
Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized");

auto structuredTokenJSON = TStructuredTokenBuilder().SetServiceAccountIdAuth(
source.GetServiceAccountId(), source.GetServiceAccountIdSignature())
.ToJson();

// If service account is provided, obtain IAM-token
Y_ENSURE(structuredTokenJSON, "empty structured token");

auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(
credentialsFactory,
structuredTokenJSON,
false);
credentialProvider = credentialsProviderFactory->CreateProvider();
}

// TODO: partitioning is not implemented now, but this code will be useful for the further research:
/*
TStringBuilder part;
Expand All @@ -579,11 +546,13 @@ namespace NYql::NDq {
part << ';';
*/

auto tokenProvider = CreateGenericTokenProvider(source, credentialsFactory);

const auto actor = new TGenericReadActor(
inputIndex,
statsLevel,
genericClient,
std::move(credentialProvider),
std::move(tokenProvider),
std::move(source),
computeActorId,
holderFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
namespace NYql::NDq {

std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*>
CreateGenericReadActor(NConnector::IClient::TPtr genericClient, NConnector::TSource&& params, ui64 inputIndex,
CreateGenericReadActor(NConnector::IClient::TPtr genericClient, Generic::TSource&& params, ui64 inputIndex,
TCollectStatsLevel statsLevel, const THashMap<TString, TString>& secureParams,
const THashMap<TString, TString>& taskParams, const NActors::TActorId& computeActorId,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ namespace NYql::NDq {
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
NYql::NConnector::IClient::TPtr genericClient) {
auto genericFactory = [credentialsFactory, genericClient](
NConnector::TSource&& settings,
Generic::TSource&& settings,
IDqAsyncIoFactory::TSourceArguments&& args) {
return CreateGenericReadActor(genericClient, std::move(settings), args.InputIndex, args.StatsLevel,
args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, args.HolderFactory);
};

for (auto& sourceName : {"ClickHouseGeneric", "PostgreSqlGeneric", "YdbGeneric"}) {
factory.RegisterSource<NConnector::TSource>(sourceName, genericFactory);
factory.RegisterSource<Generic::TSource>(sourceName, genericFactory);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#include "yql_generic_token_provider.h"

#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>

namespace NYql::NDq {
TGenericTokenProvider::TGenericTokenProvider(
const NYql::Generic::TSource& source, const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory)
: Source_(source)
, StaticIAMToken_(source.GetToken())
, CredentialsProvider_(nullptr)
{
// 1. User has provided IAM-token itself.
// This token will be used during the whole lifetime of a read actor.
if (!StaticIAMToken_.empty()) {
return;
}

// 2. User has provided service account creds.
// We create token accessor client that will renew token accessor by demand.
if (source.GetServiceAccountId() && source.GetServiceAccountIdSignature()) {
Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized");

auto structuredTokenJSON =
TStructuredTokenBuilder()
.SetServiceAccountIdAuth(source.GetServiceAccountId(), source.GetServiceAccountIdSignature())
.ToJson();

// If service account is provided, obtain IAM-token
Y_ENSURE(structuredTokenJSON, "empty structured token");

auto credentialsProviderFactory =
CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, structuredTokenJSON, false);
CredentialsProvider_ = credentialsProviderFactory->CreateProvider();
}

// 3. If we reached this point, it means that user doesn't need token auth.
}

void TGenericTokenProvider::MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const {
// 1. Don't need tokens if basic auth is set
if (dsi.credentials().has_basic()) {
return;
}

*dsi.mutable_credentials()->mutable_token()->mutable_type() = "IAM";

// 2. If static IAM-token has been provided, use it
if (!StaticIAMToken_.empty()) {
*dsi.mutable_credentials()->mutable_token()->mutable_value() = StaticIAMToken_;
return;
}

// 3. Otherwise use credentials provider to get token
Y_ENSURE(CredentialsProvider_, "CredentialsProvider is not initialized");

auto iamToken = CredentialsProvider_->GetAuthInfo();
Y_ENSURE(iamToken, "CredentialsProvider returned empty IAM token");

*dsi.mutable_credentials()->mutable_token()->mutable_value() = std::move(iamToken);
}

TGenericTokenProvider::TPtr
CreateGenericTokenProvider(const NYql::Generic::TSource& source,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) {
return std::make_unique<TGenericTokenProvider>(source, credentialsFactory);
}
} //namespace NYql::NDq
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once

#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
#include <ydb/library/yql/providers/generic/proto/source.pb.h>

namespace NYql::NDq {
// When accessing external data sources using authentication via tokens,
// there are two options:
// 1. Use static IAM-token provided by user (especially useful during debugging);
// 2. Use service account credentials in order to get (and refresh) IAM-token by demand.
class TGenericTokenProvider {
public:
using TPtr = std::unique_ptr<TGenericTokenProvider>;

TGenericTokenProvider(const NYql::Generic::TSource& source,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory);

void MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const;

private:
NYql::Generic::TSource Source_;
TString StaticIAMToken_;
NYdb::TCredentialsProviderPtr CredentialsProvider_;
};

TGenericTokenProvider::TPtr
CreateGenericTokenProvider(const NYql::Generic::TSource& source,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory);
} //namespace NYql::NDq
11 changes: 7 additions & 4 deletions ydb/library/yql/providers/generic/proto/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ syntax = "proto3";

option cc_enable_arenas = true;

package NYql.NConnector;
package NYql.Generic;

import "ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto";
import "ydb/library/yql/providers/generic/connector/api/common/data_source.proto";
Expand All @@ -11,11 +11,14 @@ message TSource {
// Prepared Select expression
NYql.NConnector.NApi.TSelect select = 2;

// ServiceAccountId and ServiceAccountIdSignature are used to obtain tokens
// to access external data source supporting this kind of authentication
// during the runtime phase.
// Credentials used to access managed databases APIs.
// When working with external data source instances deployed in clouds,
// one should either set (ServiceAccountId, ServiceAccountIdSignature) pair
// that will be resolved into IAM Token via Token Accessor,
// or provide IAM Token directly.
string ServiceAccountId = 4;
string ServiceAccountIdSignature = 5;
string Token = 6;

reserved 1, 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ struct TFakeGenericClient: public NConnector::IClient {

class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase {
public:
explicit TBuildDqSourceSettingsTransformer(TTypeAnnotationContext* types, NConnector::TSource* dqSourceSettings, bool* dqSourceSettingsWereBuilt)
explicit TBuildDqSourceSettingsTransformer(TTypeAnnotationContext* types, Generic::TSource* dqSourceSettings, bool* dqSourceSettingsWereBuilt)
: TOptimizeTransformerBase(types, NLog::EComponent::ProviderGeneric, {})
, DqSourceSettings_(dqSourceSettings)
, DqSourceSettingsWereBuilt_(dqSourceSettingsWereBuilt)
Expand Down Expand Up @@ -182,13 +182,13 @@ class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase {
TString sourceType;
dqIntegration->FillSourceSettings(*dqSourceNode, settings, sourceType, 1);
UNIT_ASSERT_STRINGS_EQUAL(sourceType, "PostgreSqlGeneric");
UNIT_ASSERT(settings.Is<NConnector::TSource>());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Что с обратной совместимостью? Из any можно будет достаточно и старый и новый протобуф?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не уверен, что однозначно понял вопрос, но 100% ответа у меня в любом случае нет. Поэтому вернул название неймспейса к тому виду, который был у нас всегда вплоть до предыдущего PR, вмёрженного в пятницу.

UNIT_ASSERT(settings.Is<Generic::TSource>());
settings.UnpackTo(DqSourceSettings_);
*DqSourceSettingsWereBuilt_ = true;
}

private:
NConnector::TSource* DqSourceSettings_;
Generic::TSource* DqSourceSettings_;
bool* DqSourceSettingsWereBuilt_;
};

Expand All @@ -207,7 +207,7 @@ struct TPushdownFixture: public NUnitTest::TBaseFixture {

TAutoPtr<IGraphTransformer> Transformer;
TAutoPtr<IGraphTransformer> BuildDqSourceSettingsTransformer;
NConnector::TSource DqSourceSettings;
Generic::TSource DqSourceSettings;
bool DqSourceSettingsWereBuilt = false;

TExprNode::TPtr InitialExprRoot;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ namespace NYql {
const auto& clusterConfig = State_->Configuration->ClusterNamesToClusterConfigs[clusterName];
const auto& endpoint = clusterConfig.endpoint();

NConnector::TSource source;
Generic::TSource source;

// for backward compability full path can be used (cluster_name.`db_name.table`)
// TODO: simplify during https://st.yandex-team.ru/YQ-2494
Expand Down Expand Up @@ -149,10 +149,15 @@ namespace NYql {
}

// Managed YDB supports access via IAM token.
// Copy service account ids to obtain tokens during request execution phase.
// If exist, copy service account creds to obtain tokens during request execution phase.
// If exists, copy previously created token.
if (clusterConfig.kind() == NConnector::NApi::EDataSourceKind::YDB) {
source.SetServiceAccountId(clusterConfig.GetServiceAccountId());
source.SetServiceAccountIdSignature(clusterConfig.GetServiceAccountIdSignature());
source.SetToken(State_->Types->Credentials->FindCredentialContent(
"default_" + clusterConfig.name(),
"default_generic",
clusterConfig.GetToken()));
}

// preserve source description for read actor
Expand Down
Loading