Skip to content

Commit 076c929

Browse files
authored
New methods to provide IAM-tokens when accessing managed YDB from dqrun. (#2386)
1 parent 377a29f commit 076c929

File tree

11 files changed

+154
-66
lines changed

11 files changed

+154
-66
lines changed

ydb/library/yql/providers/common/proto/gateways_config.proto

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -563,12 +563,11 @@ message TGenericClusterConfig {
563563
// Credentials used to access data source instance
564564
optional NYql.NConnector.NApi.TCredentials Credentials = 10;
565565

566-
// Credentials used to access MDB API.
567-
// When working with data source instances deployed in a cloud,
568-
// you should either set (ServiceAccountId, ServiceAccountIdSignature) pair,
569-
// or set IAM Token.
570-
// The names of these fields must satisfy this template function:
571-
// https://github.com/ydb-platform/ydb/arcadia/contrib/ydb/core/fq/libs/actors/clusters_from_connections.cpp?rev=r11823087#L19
566+
// Credentials used to access managed databases APIs.
567+
// When working with external data source instances deployed in clouds,
568+
// one should either set (ServiceAccountId, ServiceAccountIdSignature) pair
569+
// that will be resolved into IAM Token via Token Accessor,
570+
// or provide IAM Token directly.
572571
optional string ServiceAccountId = 6;
573572
optional string ServiceAccountIdSignature = 7;
574573
optional string Token = 11;

ydb/library/yql/providers/generic/actors/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ LIBRARY()
33
SRCS(
44
yql_generic_read_actor.cpp
55
yql_generic_source_factory.cpp
6+
yql_generic_token_provider.cpp
67
)
78

89
PEERDIR(

ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp

Lines changed: 12 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "yql_generic_read_actor.h"
2+
#include "yql_generic_token_provider.h"
23

34
#include <ydb/library/actors/core/actor_bootstrapped.h>
45
#include <ydb/library/actors/core/actorsystem.h>
@@ -9,11 +10,9 @@
910
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
1011
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
1112
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
12-
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
1313
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
1414
#include <ydb/library/yql/providers/generic/connector/libcpp/error.h>
1515
#include <ydb/library/yql/providers/generic/connector/libcpp/utils.h>
16-
#include <ydb/library/yql/providers/generic/proto/range.pb.h>
1716
#include <ydb/library/yql/public/udf/arrow/util.h>
1817
#include <ydb/library/yql/utils/log/log.h>
1918
#include <ydb/library/yql/utils/yql_panic.h>
@@ -104,14 +103,14 @@ namespace NYql::NDq {
104103
ui64 inputIndex,
105104
TCollectStatsLevel statsLevel,
106105
NConnector::IClient::TPtr client,
107-
NYdb::TCredentialsProviderPtr credentialsProvider,
108-
NConnector::TSource&& source,
106+
TGenericTokenProvider::TPtr tokenProvider,
107+
Generic::TSource&& source,
109108
const NActors::TActorId& computeActorId,
110109
const NKikimr::NMiniKQL::THolderFactory& holderFactory)
111110
: InputIndex_(inputIndex)
112111
, ComputeActorId_(computeActorId)
113112
, Client_(std::move(client))
114-
, CredentialsProvider_(std::move(credentialsProvider))
113+
, TokenProvider_(std::move(tokenProvider))
115114
, HolderFactory_(holderFactory)
116115
, Source_(source)
117116
{
@@ -146,7 +145,7 @@ namespace NYql::NDq {
146145
// Prepare request
147146
NConnector::NApi::TListSplitsRequest request;
148147
NConnector::NApi::TSelect select = Source_.select(); // copy TSelect from source
149-
MaybeRefreshToken(select.mutable_data_source_instance());
148+
TokenProvider_->MaybeFillToken(*select.mutable_data_source_instance());
150149
*request.mutable_selects()->Add() = std::move(select);
151150

152151
// Initialize stream
@@ -242,7 +241,7 @@ namespace NYql::NDq {
242241
Splits_.cbegin(), Splits_.cend(),
243242
[&](const NConnector::NApi::TSplit& split) {
244243
NConnector::NApi::TSplit splitCopy = split;
245-
MaybeRefreshToken(splitCopy.mutable_select()->mutable_data_source_instance());
244+
TokenProvider_->MaybeFillToken(*splitCopy.mutable_select()->mutable_data_source_instance());
246245
*request.mutable_splits()->Add() = std::move(split);
247246
});
248247

@@ -459,20 +458,6 @@ namespace NYql::NDq {
459458
return total;
460459
}
461460

462-
void MaybeRefreshToken(NConnector::NApi::TDataSourceInstance* dsi) const {
463-
if (!dsi->credentials().has_token()) {
464-
return;
465-
}
466-
467-
// Token may have expired. Refresh it.
468-
Y_ENSURE(CredentialsProvider_, "CredentialsProvider is not initialized");
469-
auto iamToken = CredentialsProvider_->GetAuthInfo();
470-
Y_ENSURE(iamToken, "empty IAM token");
471-
472-
*dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken;
473-
*dsi->mutable_credentials()->mutable_token()->mutable_type() = "IAM";
474-
}
475-
476461
// IActor & IDqComputeActorAsyncInput
477462
void PassAway() override { // Is called from Compute Actor
478463
YQL_CLOG(INFO, ProviderGeneric) << "PassAway :: final ingress stats"
@@ -505,7 +490,7 @@ namespace NYql::NDq {
505490
const NActors::TActorId ComputeActorId_;
506491

507492
NConnector::IClient::TPtr Client_;
508-
NYdb::TCredentialsProviderPtr CredentialsProvider_;
493+
TGenericTokenProvider::TPtr TokenProvider_;
509494
NConnector::IListSplitsStreamIterator::TPtr ListSplitsIterator_;
510495
TVector<NConnector::NApi::TSplit> Splits_; // accumulated list of table splits
511496
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator_;
@@ -514,12 +499,12 @@ namespace NYql::NDq {
514499

515500
NKikimr::NMiniKQL::TPlainContainerCache ArrowRowContainerCache_;
516501
const NKikimr::NMiniKQL::THolderFactory& HolderFactory_;
517-
NConnector::TSource Source_;
502+
Generic::TSource Source_;
518503
};
519504

520505
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*>
521506
CreateGenericReadActor(NConnector::IClient::TPtr genericClient,
522-
NConnector::TSource&& source,
507+
Generic::TSource&& source,
523508
ui64 inputIndex,
524509
TCollectStatsLevel statsLevel,
525510
const THashMap<TString, TString>& /*secureParams*/,
@@ -548,24 +533,6 @@ namespace NYql::NDq {
548533
*/
549534

550535
// Obtain token to access remote data source if necessary
551-
NYdb::TCredentialsProviderPtr credentialProvider;
552-
if (source.GetServiceAccountId() && source.GetServiceAccountIdSignature()) {
553-
Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized");
554-
555-
auto structuredTokenJSON = TStructuredTokenBuilder().SetServiceAccountIdAuth(
556-
source.GetServiceAccountId(), source.GetServiceAccountIdSignature())
557-
.ToJson();
558-
559-
// If service account is provided, obtain IAM-token
560-
Y_ENSURE(structuredTokenJSON, "empty structured token");
561-
562-
auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(
563-
credentialsFactory,
564-
structuredTokenJSON,
565-
false);
566-
credentialProvider = credentialsProviderFactory->CreateProvider();
567-
}
568-
569536
// TODO: partitioning is not implemented now, but this code will be useful for the further research:
570537
/*
571538
TStringBuilder part;
@@ -579,11 +546,13 @@ namespace NYql::NDq {
579546
part << ';';
580547
*/
581548

549+
auto tokenProvider = CreateGenericTokenProvider(source, credentialsFactory);
550+
582551
const auto actor = new TGenericReadActor(
583552
inputIndex,
584553
statsLevel,
585554
genericClient,
586-
std::move(credentialProvider),
555+
std::move(tokenProvider),
587556
std::move(source),
588557
computeActorId,
589558
holderFactory);

ydb/library/yql/providers/generic/actors/yql_generic_read_actor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
namespace NYql::NDq {
1010

1111
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*>
12-
CreateGenericReadActor(NConnector::IClient::TPtr genericClient, NConnector::TSource&& params, ui64 inputIndex,
12+
CreateGenericReadActor(NConnector::IClient::TPtr genericClient, Generic::TSource&& params, ui64 inputIndex,
1313
TCollectStatsLevel statsLevel, const THashMap<TString, TString>& secureParams,
1414
const THashMap<TString, TString>& taskParams, const NActors::TActorId& computeActorId,
1515
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,

ydb/library/yql/providers/generic/actors/yql_generic_source_factory.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ namespace NYql::NDq {
1010
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
1111
NYql::NConnector::IClient::TPtr genericClient) {
1212
auto genericFactory = [credentialsFactory, genericClient](
13-
NConnector::TSource&& settings,
13+
Generic::TSource&& settings,
1414
IDqAsyncIoFactory::TSourceArguments&& args) {
1515
return CreateGenericReadActor(genericClient, std::move(settings), args.InputIndex, args.StatsLevel,
1616
args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, args.HolderFactory);
1717
};
1818

1919
for (auto& sourceName : {"ClickHouseGeneric", "PostgreSqlGeneric", "YdbGeneric"}) {
20-
factory.RegisterSource<NConnector::TSource>(sourceName, genericFactory);
20+
factory.RegisterSource<Generic::TSource>(sourceName, genericFactory);
2121
}
2222
}
2323

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
#include "yql_generic_token_provider.h"
2+
3+
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
4+
5+
namespace NYql::NDq {
6+
TGenericTokenProvider::TGenericTokenProvider(
7+
const NYql::Generic::TSource& source, const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory)
8+
: Source_(source)
9+
, StaticIAMToken_(source.GetToken())
10+
, CredentialsProvider_(nullptr)
11+
{
12+
// 1. User has provided IAM-token itself.
13+
// This token will be used during the whole lifetime of a read actor.
14+
if (!StaticIAMToken_.empty()) {
15+
return;
16+
}
17+
18+
// 2. User has provided service account creds.
19+
// We create token accessor client that will renew token accessor by demand.
20+
if (source.GetServiceAccountId() && source.GetServiceAccountIdSignature()) {
21+
Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized");
22+
23+
auto structuredTokenJSON =
24+
TStructuredTokenBuilder()
25+
.SetServiceAccountIdAuth(source.GetServiceAccountId(), source.GetServiceAccountIdSignature())
26+
.ToJson();
27+
28+
// If service account is provided, obtain IAM-token
29+
Y_ENSURE(structuredTokenJSON, "empty structured token");
30+
31+
auto credentialsProviderFactory =
32+
CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, structuredTokenJSON, false);
33+
CredentialsProvider_ = credentialsProviderFactory->CreateProvider();
34+
}
35+
36+
// 3. If we reached this point, it means that user doesn't need token auth.
37+
}
38+
39+
void TGenericTokenProvider::MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const {
40+
// 1. Don't need tokens if basic auth is set
41+
if (dsi.credentials().has_basic()) {
42+
return;
43+
}
44+
45+
*dsi.mutable_credentials()->mutable_token()->mutable_type() = "IAM";
46+
47+
// 2. If static IAM-token has been provided, use it
48+
if (!StaticIAMToken_.empty()) {
49+
*dsi.mutable_credentials()->mutable_token()->mutable_value() = StaticIAMToken_;
50+
return;
51+
}
52+
53+
// 3. Otherwise use credentials provider to get token
54+
Y_ENSURE(CredentialsProvider_, "CredentialsProvider is not initialized");
55+
56+
auto iamToken = CredentialsProvider_->GetAuthInfo();
57+
Y_ENSURE(iamToken, "CredentialsProvider returned empty IAM token");
58+
59+
*dsi.mutable_credentials()->mutable_token()->mutable_value() = std::move(iamToken);
60+
}
61+
62+
TGenericTokenProvider::TPtr
63+
CreateGenericTokenProvider(const NYql::Generic::TSource& source,
64+
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory) {
65+
return std::make_unique<TGenericTokenProvider>(source, credentialsFactory);
66+
}
67+
} //namespace NYql::NDq
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#pragma once
2+
3+
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
4+
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
5+
#include <ydb/library/yql/providers/generic/proto/source.pb.h>
6+
7+
namespace NYql::NDq {
8+
// When accessing external data sources using authentication via tokens,
9+
// there are two options:
10+
// 1. Use static IAM-token provided by user (especially useful during debugging);
11+
// 2. Use service account credentials in order to get (and refresh) IAM-token by demand.
12+
class TGenericTokenProvider {
13+
public:
14+
using TPtr = std::unique_ptr<TGenericTokenProvider>;
15+
16+
TGenericTokenProvider(const NYql::Generic::TSource& source,
17+
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory);
18+
19+
void MaybeFillToken(NConnector::NApi::TDataSourceInstance& dsi) const;
20+
21+
private:
22+
NYql::Generic::TSource Source_;
23+
TString StaticIAMToken_;
24+
NYdb::TCredentialsProviderPtr CredentialsProvider_;
25+
};
26+
27+
TGenericTokenProvider::TPtr
28+
CreateGenericTokenProvider(const NYql::Generic::TSource& source,
29+
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory);
30+
} //namespace NYql::NDq

ydb/library/yql/providers/generic/proto/source.proto

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ syntax = "proto3";
22

33
option cc_enable_arenas = true;
44

5-
package NYql.NConnector;
5+
package NYql.Generic;
66

77
import "ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto";
88
import "ydb/library/yql/providers/generic/connector/api/common/data_source.proto";
@@ -11,11 +11,14 @@ message TSource {
1111
// Prepared Select expression
1212
NYql.NConnector.NApi.TSelect select = 2;
1313

14-
// ServiceAccountId and ServiceAccountIdSignature are used to obtain tokens
15-
// to access external data source supporting this kind of authentication
16-
// during the runtime phase.
14+
// Credentials used to access managed databases APIs.
15+
// When working with external data source instances deployed in clouds,
16+
// one should either set (ServiceAccountId, ServiceAccountIdSignature) pair
17+
// that will be resolved into IAM Token via Token Accessor,
18+
// or provide IAM Token directly.
1719
string ServiceAccountId = 4;
1820
string ServiceAccountIdSignature = 5;
21+
string Token = 6;
1922

2023
reserved 1, 3;
2124
}

ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ struct TFakeGenericClient: public NConnector::IClient {
144144

145145
class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase {
146146
public:
147-
explicit TBuildDqSourceSettingsTransformer(TTypeAnnotationContext* types, NConnector::TSource* dqSourceSettings, bool* dqSourceSettingsWereBuilt)
147+
explicit TBuildDqSourceSettingsTransformer(TTypeAnnotationContext* types, Generic::TSource* dqSourceSettings, bool* dqSourceSettingsWereBuilt)
148148
: TOptimizeTransformerBase(types, NLog::EComponent::ProviderGeneric, {})
149149
, DqSourceSettings_(dqSourceSettings)
150150
, DqSourceSettingsWereBuilt_(dqSourceSettingsWereBuilt)
@@ -182,13 +182,13 @@ class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase {
182182
TString sourceType;
183183
dqIntegration->FillSourceSettings(*dqSourceNode, settings, sourceType, 1);
184184
UNIT_ASSERT_STRINGS_EQUAL(sourceType, "PostgreSqlGeneric");
185-
UNIT_ASSERT(settings.Is<NConnector::TSource>());
185+
UNIT_ASSERT(settings.Is<Generic::TSource>());
186186
settings.UnpackTo(DqSourceSettings_);
187187
*DqSourceSettingsWereBuilt_ = true;
188188
}
189189

190190
private:
191-
NConnector::TSource* DqSourceSettings_;
191+
Generic::TSource* DqSourceSettings_;
192192
bool* DqSourceSettingsWereBuilt_;
193193
};
194194

@@ -207,7 +207,7 @@ struct TPushdownFixture: public NUnitTest::TBaseFixture {
207207

208208
TAutoPtr<IGraphTransformer> Transformer;
209209
TAutoPtr<IGraphTransformer> BuildDqSourceSettingsTransformer;
210-
NConnector::TSource DqSourceSettings;
210+
Generic::TSource DqSourceSettings;
211211
bool DqSourceSettingsWereBuilt = false;
212212

213213
TExprNode::TPtr InitialExprRoot;

ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ namespace NYql {
102102
const auto& clusterConfig = State_->Configuration->ClusterNamesToClusterConfigs[clusterName];
103103
const auto& endpoint = clusterConfig.endpoint();
104104

105-
NConnector::TSource source;
105+
Generic::TSource source;
106106

107107
// for backward compability full path can be used (cluster_name.`db_name.table`)
108108
// TODO: simplify during https://st.yandex-team.ru/YQ-2494
@@ -149,10 +149,15 @@ namespace NYql {
149149
}
150150

151151
// Managed YDB supports access via IAM token.
152-
// Copy service account ids to obtain tokens during request execution phase.
152+
// If exist, copy service account creds to obtain tokens during request execution phase.
153+
// If exists, copy previously created token.
153154
if (clusterConfig.kind() == NConnector::NApi::EDataSourceKind::YDB) {
154155
source.SetServiceAccountId(clusterConfig.GetServiceAccountId());
155156
source.SetServiceAccountIdSignature(clusterConfig.GetServiceAccountIdSignature());
157+
source.SetToken(State_->Types->Credentials->FindCredentialContent(
158+
"default_" + clusterConfig.name(),
159+
"default_generic",
160+
clusterConfig.GetToken()));
156161
}
157162

158163
// preserve source description for read actor

0 commit comments

Comments
 (0)