diff --git a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp index 34647372530b..04ffd69dc70c 100644 --- a/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/generic_ut/kqp_generic_provider_ut.cpp @@ -110,9 +110,6 @@ namespace NKikimr::NKqp { clientMock->ExpectListSplits() .Select() .DataSourceInstance(dataSourceInstance) - .What() - .Column("col1", Ydb::Type::UINT16) - .Done() .Done() .Result() .AddResponse(NewSuccess()) @@ -209,9 +206,6 @@ namespace NKikimr::NKqp { clientMock->ExpectListSplits() .Select() .DataSourceInstance(dataSourceInstance) - .What() - // Empty - .Done() .Done() .Result() .AddResponse(NewSuccess()) @@ -302,9 +296,6 @@ namespace NKikimr::NKqp { clientMock->ExpectListSplits() .Select() .DataSourceInstance(dataSourceInstance) - .What() - // Empty - .Done() .Done() .Result() .AddResponse(NewSuccess()) @@ -375,8 +366,12 @@ namespace NKikimr::NKqp { auto clientMock = std::make_shared(); const NYql::TGenericDataSourceInstance dataSourceInstance = MakeDataSourceInstance(providerType); + // clang-format off - const NApi::TSelect select = TConnectorClientMock::TSelectBuilder<>() + const NApi::TSelect selectInListSplits = TConnectorClientMock::TSelectBuilder<>() + .DataSourceInstance(dataSourceInstance).GetResult(); + + const NApi::TSelect selectInReadSplits = TConnectorClientMock::TSelectBuilder<>() .DataSourceInstance(dataSourceInstance) .What() .NullableColumn("data_column", Ydb::Type::STRING) @@ -406,11 +401,11 @@ namespace NKikimr::NKqp { // step 2: ListSplits // clang-format off clientMock->ExpectListSplits() - .Select(select) + .Select(selectInListSplits) .Result() .AddResponse(NewSuccess()) .Description("some binary description") - .Select(select); + .Select(selectInReadSplits); // clang-format on // step 3: ReadSplits @@ -424,7 +419,7 @@ namespace NKikimr::NKqp { .Filtering(NYql::NConnector::NApi::TReadSplitsRequest::FILTERING_OPTIONAL) .Split() .Description("some binary description") - .Select(select) + .Select(selectInReadSplits) .Done() .Result() .AddResponse(MakeRecordBatch( diff --git a/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp b/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp index 95b057e7d304..4426d9e3a2d7 100644 --- a/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp +++ b/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp @@ -160,7 +160,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) { ; // clang-format on - NYql::Generic::TLookupSource lookupSourceSettings; + NYql::NGeneric::TLookupSource lookupSourceSettings; *lookupSourceSettings.mutable_data_source_instance() = dsi; lookupSourceSettings.Settable("lookup_test"); lookupSourceSettings.SetServiceAccountId("testsaid"); @@ -351,7 +351,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) { } // clang-format on - NYql::Generic::TLookupSource lookupSourceSettings; + NYql::NGeneric::TLookupSource lookupSourceSettings; *lookupSourceSettings.mutable_data_source_instance() = dsi; lookupSourceSettings.Settable("lookup_test"); lookupSourceSettings.SetServiceAccountId("testsaid"); diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp index 321802fa9dfe..4f4562ae88ab 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -69,7 +68,7 @@ namespace NYql::NDq { ::NMonitoring::TDynamicCounterPtr taskCounters, std::shared_ptr alloc, std::shared_ptr keyTypeHelper, - NYql::Generic::TLookupSource&& lookupSource, + NYql::NGeneric::TLookupSource&& lookupSource, const NKikimr::NMiniKQL::TStructType* keyType, const NKikimr::NMiniKQL::TStructType* payloadType, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, @@ -500,7 +499,7 @@ namespace NYql::NDq { const NActors::TActorId ParentId; std::shared_ptr Alloc; std::shared_ptr KeyTypeHelper; - const NYql::Generic::TLookupSource LookupSource; + const NYql::NGeneric::TLookupSource LookupSource; const NKikimr::NMiniKQL::TStructType* const KeyType; const NKikimr::NMiniKQL::TStructType* const PayloadType; const NKikimr::NMiniKQL::TStructType* const SelectResultType; // columns from KeyType + PayloadType @@ -529,7 +528,7 @@ namespace NYql::NDq { ::NMonitoring::TDynamicCounterPtr taskCounters, std::shared_ptr alloc, std::shared_ptr keyTypeHelper, - NYql::Generic::TLookupSource&& lookupSource, + NYql::NGeneric::TLookupSource&& lookupSource, const NKikimr::NMiniKQL::TStructType* keyType, const NKikimr::NMiniKQL::TStructType* payloadType, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h index 45dda2c67acf..06a2bf153bf6 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h +++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h @@ -18,7 +18,7 @@ namespace NYql::NDq { ::NMonitoring::TDynamicCounterPtr taskCounters, std::shared_ptr alloc, std::shared_ptr keyTypeHelper, - NYql::Generic::TLookupSource&& lookupSource, + NYql::NGeneric::TLookupSource&& lookupSource, const NKikimr::NMiniKQL::TStructType* keyType, const NKikimr::NMiniKQL::TStructType* payloadType, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp index 243e9a591a17..ddd0a655614d 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp @@ -11,13 +11,23 @@ namespace NYql::NDq { ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NYql::NConnector::IClient::TPtr genericClient) { auto readActorFactory = [credentialsFactory, genericClient]( - Generic::TSource&& settings, + NGeneric::TSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { - return CreateGenericReadActor(genericClient, std::move(settings), args.InputIndex, args.StatsLevel, - args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, args.HolderFactory); + return CreateGenericReadActor( + genericClient, + std::move(settings), + args.InputIndex, + args.StatsLevel, + args.SecureParams, + args.TaskId, + args.TaskParams, + args.ReadRanges, + args.ComputeActorId, + credentialsFactory, + args.HolderFactory); }; - auto lookupActorFactory = [credentialsFactory, genericClient](NYql::Generic::TLookupSource&& lookupSource, IDqAsyncIoFactory::TLookupSourceArguments&& args) { + auto lookupActorFactory = [credentialsFactory, genericClient](NYql::NGeneric::TLookupSource&& lookupSource, IDqAsyncIoFactory::TLookupSourceArguments&& args) { return CreateGenericLookupActor( genericClient, credentialsFactory, @@ -34,17 +44,16 @@ namespace NYql::NDq { }; for (auto& name : { - "ClickHouseGeneric", - "PostgreSqlGeneric", - "YdbGeneric", - "MySqlGeneric", - "GreenplumGeneric", - "MsSQLServerGeneric", - "OracleGeneric", - "LoggingGeneric"} - ) { - factory.RegisterSource(name, readActorFactory); - factory.RegisterLookupSource(name, lookupActorFactory); + "ClickHouseGeneric", + "PostgreSqlGeneric", + "YdbGeneric", + "MySqlGeneric", + "GreenplumGeneric", + "MsSQLServerGeneric", + "OracleGeneric", + "LoggingGeneric"}) { + factory.RegisterSource(name, readActorFactory); + factory.RegisterLookupSource(name, lookupActorFactory); } } diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp index 1a2a5a45d57a..d71a8d783f6e 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp @@ -2,22 +2,23 @@ #include "yql_generic_read_actor.h" #include "yql_generic_token_provider.h" +#include #include #include #include #include #include #include -#include -#include -#include #include #include #include +#include +#include +#include +#include #include #include #include -#include namespace NYql::NDq { @@ -44,13 +45,15 @@ namespace NYql::NDq { TCollectStatsLevel statsLevel, NConnector::IClient::TPtr client, TGenericTokenProvider::TPtr tokenProvider, - Generic::TSource&& source, + NGeneric::TSource&& source, const NActors::TActorId& computeActorId, - const NKikimr::NMiniKQL::THolderFactory& holderFactory) + const NKikimr::NMiniKQL::THolderFactory& holderFactory, + TVector&& partitions) : InputIndex_(inputIndex) , ComputeActorId_(computeActorId) , Client_(std::move(client)) , TokenProvider_(std::move(tokenProvider)) + , Partitions_(std::move(partitions)) , HolderFactory_(holderFactory) , Source_(source) { @@ -59,7 +62,7 @@ namespace NYql::NDq { void Bootstrap() { Become(&TGenericReadActor::StateFunc); - auto issue = InitSplitsListing(); + auto issue = InitSplitsReading(); if (issue) { return NotifyComputeActorWithIssue( TActivationContext::ActorSystem(), @@ -72,145 +75,50 @@ namespace NYql::NDq { static constexpr char ActorName[] = "GENERIC_READ_ACTOR"; private: - // TODO: make two different states // clang-format off STRICT_STFUNC(StateFunc, - hFunc(TEvListSplitsIterator, Handle); - hFunc(TEvListSplitsPart, Handle); - hFunc(TEvListSplitsFinished, Handle); hFunc(TEvReadSplitsIterator, Handle); hFunc(TEvReadSplitsPart, Handle); hFunc(TEvReadSplitsFinished, Handle); ) // clang-format on - // ListSplits - - TMaybe InitSplitsListing() { - YQL_CLOG(DEBUG, ProviderGeneric) << "Start splits listing"; - - // Prepare request - NConnector::NApi::TListSplitsRequest request; - NConnector::NApi::TSelect select = Source_.select(); // copy TSelect from source - - auto error = TokenProvider_->MaybeFillToken(*select.mutable_data_source_instance()); - if (error) { - return TIssue(error); - } - - *request.mutable_selects()->Add() = std::move(select); - - // Initialize stream - Client_->ListSplits(request).Subscribe( - [actorSystem = TActivationContext::ActorSystem(), - selfId = SelfId(), - computeActorId = ComputeActorId_, - inputIndex = InputIndex_]( - const NConnector::TListSplitsStreamIteratorAsyncResult& future) { - AwaitIterator< - NConnector::TListSplitsStreamIteratorAsyncResult, - TEvListSplitsIterator>( - actorSystem, selfId, computeActorId, inputIndex, future); - }); - - return Nothing(); - } - - void Handle(TEvListSplitsIterator::TPtr& ev) { - ListSplitsIterator_ = std::move(ev->Get()->Iterator); - - AwaitNextStreamItem(ListSplitsIterator_); - } - - void Handle(TEvListSplitsPart::TPtr& ev) { - auto& response = ev->Get()->Response; - YQL_CLOG(TRACE, ProviderGeneric) << "Handle :: EvListSplitsPart :: event handling started" - << ": splits_size=" << response.splits().size(); - - if (!NConnector::IsSuccess(response)) { - return NotifyComputeActorWithError( - TActivationContext::ActorSystem(), - ComputeActorId_, - InputIndex_, - response.error()); - } - - // Save splits for the further usage - Splits_.insert( - Splits_.end(), - std::move_iterator(response.mutable_splits()->begin()), - std::move_iterator(response.mutable_splits()->end())); - - // ask for next stream message - AwaitNextStreamItem(ListSplitsIterator_); - - YQL_CLOG(TRACE, ProviderGeneric) << "Handle :: EvListSplitsPart :: event handling finished"; - } - - void Handle(TEvListSplitsFinished::TPtr& ev) { - const auto& status = ev->Get()->Status; - - YQL_CLOG(TRACE, ProviderGeneric) << "Handle :: EvListSplitsFinished :: event handling started: "; - - // Server sent EOF, now we are ready to start splits reading - if (NConnector::GrpcStatusEndOfStream(status)) { - YQL_CLOG(DEBUG, ProviderGeneric) << "Handle :: EvListSplitsFinished :: last message was reached, start data reading"; - auto issue = InitSplitsReading(); - if (issue) { - return NotifyComputeActorWithIssue( - TActivationContext::ActorSystem(), - ComputeActorId_, - InputIndex_, - std::move(*issue)); - } - - return; - } - - // Server temporary failure - if (NConnector::GrpcStatusNeedsRetry(status)) { - YQL_CLOG(WARN, ProviderGeneric) << "Handle :: EvListSplitsFinished :: you should retry your operation due to '" - << status.ToDebugString() << "' error"; - // TODO: retry - } - - return NotifyComputeActorWithError( - TActivationContext::ActorSystem(), - ComputeActorId_, - InputIndex_, - NConnector::ErrorFromGRPCStatus(status)); - } - // ReadSplits TMaybe InitSplitsReading() { YQL_CLOG(DEBUG, ProviderGeneric) << "Start splits reading"; - if (Splits_.empty()) { - YQL_CLOG(WARN, ProviderGeneric) << "Accumulated empty list of splits"; + if (Partitions_.empty()) { + YQL_CLOG(WARN, ProviderGeneric) << "Got empty list of partitions"; ReadSplitsFinished_ = true; NotifyComputeActorWithData(); return Nothing(); } - // Prepare request + // Prepare ReadSplits request. For the sake of simplicity, + // all the splits from all partitions will be packed into a single ReadSplits call. + // There's a lot of space for the optimizations here. NConnector::NApi::TReadSplitsRequest request; request.set_format(NConnector::NApi::TReadSplitsRequest::ARROW_IPC_STREAMING); request.set_filtering(NConnector::NApi::TReadSplitsRequest::FILTERING_OPTIONAL); - request.mutable_splits()->Reserve(Splits_.size()); - for (const auto& split : Splits_) { - NConnector::NApi::TSplit splitCopy = split; + for (const auto& partition : Partitions_) { + request.mutable_splits()->Reserve(request.splits().size() + partition.splits().size()); - auto error = TokenProvider_->MaybeFillToken(*splitCopy.mutable_select()->mutable_data_source_instance()); - if (error) { - return TIssue(std::move(error)); - } + for (const auto& srcSplit : partition.splits()) { + auto dstSplit = request.add_splits(); + + // Take actual SQL request from the source, because it contains predicates + *dstSplit->mutable_select() = Source_.select(); + + // Take split description from task params + dstSplit->set_description(srcSplit.description()); - *request.mutable_splits()->Add() = std::move(splitCopy); + // Assign actual IAM token to a split + auto error = TokenProvider_->MaybeFillToken(*dstSplit->mutable_select()->mutable_data_source_instance()); + if (error) { + return TIssue(std::move(error)); + } + } } // Start streaming @@ -471,35 +379,67 @@ namespace NYql::NDq { NConnector::IClient::TPtr Client_; TGenericTokenProvider::TPtr TokenProvider_; - NConnector::IListSplitsStreamIterator::TPtr ListSplitsIterator_; - TVector Splits_; // accumulated list of table splits + + TVector Partitions_; + NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator_; std::optional LastReadSplitsResponse_; bool ReadSplitsFinished_ = false; NKikimr::NMiniKQL::TPlainContainerCache ArrowRowContainerCache_; const NKikimr::NMiniKQL::THolderFactory& HolderFactory_; - Generic::TSource Source_; + NGeneric::TSource Source_; }; + void ExtractPartitionsFromParams( + TVector& partitions, + const THashMap& taskParams, // partitions are here in v1 + const TVector& readRanges // partitions are here in v2 + ) { + if (!readRanges.empty()) { + for (const auto& readRange : readRanges) { + NGeneric::TPartition partition; + YQL_ENSURE(partition.ParseFromString(readRange), "Failed to parse partition from read ranges"); + partitions.emplace_back(std::move(partition)); + } + } else { + const auto& iter = taskParams.find(GenericProviderName); + if (iter != taskParams.end()) { + NGeneric::TPartition partition; + TStringInput input(iter->first); + YQL_ENSURE(partition.ParseFromString(iter->second), "Failed to parse partition from task params"); + partitions.emplace_back(std::move(partition)); + } + } + + Y_ENSURE(!partitions.empty(), "partitions must not be empty"); + } + std::pair CreateGenericReadActor(NConnector::IClient::TPtr genericClient, - Generic::TSource&& source, + NGeneric::TSource&& source, ui64 inputIndex, TCollectStatsLevel statsLevel, const THashMap& /*secureParams*/, - const THashMap& /*taskParams*/, + ui64 taskId, + const THashMap& taskParams, + const TVector& readRanges, const NActors::TActorId& computeActorId, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const NKikimr::NMiniKQL::THolderFactory& holderFactory) { + TVector partitions; + ExtractPartitionsFromParams(partitions, taskParams, readRanges); + const auto dsi = source.select().data_source_instance(); YQL_CLOG(INFO, ProviderGeneric) << "Creating read actor with params:" << " kind=" << NYql::EGenericDataSourceKind_Name(dsi.kind()) << ", endpoint=" << dsi.endpoint().ShortDebugString() << ", database=" << dsi.database() << ", use_tls=" << ToString(dsi.use_tls()) - << ", protocol=" << NYql::EGenericProtocol_Name(dsi.protocol()); + << ", protocol=" << NYql::EGenericProtocol_Name(dsi.protocol()) + << ", task_id=" << taskId + << ", partitions_count=" << partitions.size(); // FIXME: strange piece of logic - authToken is created but not used: // https://a.yandex-team.ru/arcadia/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp?rev=r11550199#L140 @@ -512,23 +452,9 @@ namespace NYql::NDq { YQL_ENSURE(one != TString::npos && two != TString::npos && one < two, "Bad token format:" << token); */ - // Obtain token to access remote data source if necessary - // TODO: partitioning is not implemented now, but this code will be useful for the further research: - /* - TStringBuilder part; - if (const auto taskParamsIt = taskParams.find(GenericProviderName); taskParamsIt != taskParams.cend()) { - Generic::TRange range; - TStringInput input(taskParamsIt->second); - range.Load(&input); - if (const auto& r = range.GetRange(); !r.empty()) - part << ' ' << r; - } - part << ';'; - */ - auto tokenProvider = CreateGenericTokenProvider( source.GetToken(), - source.GetServiceAccountId(), + source.GetServiceAccountId(), source.GetServiceAccountIdSignature(), credentialsFactory); @@ -539,7 +465,8 @@ namespace NYql::NDq { std::move(tokenProvider), std::move(source), computeActorId, - holderFactory); + holderFactory, + std::move(partitions)); return {actor, actor}; } diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.h b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.h index 1bdb050dcd72..f0d58e8048c2 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.h +++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.h @@ -9,10 +9,17 @@ namespace NYql::NDq { std::pair - CreateGenericReadActor(NConnector::IClient::TPtr genericClient, Generic::TSource&& params, ui64 inputIndex, - TCollectStatsLevel statsLevel, const THashMap& secureParams, - const THashMap& taskParams, const NActors::TActorId& computeActorId, - ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - const NKikimr::NMiniKQL::THolderFactory& holderFactory); + CreateGenericReadActor( + NConnector::IClient::TPtr genericClient, + NGeneric::TSource&& params, + ui64 inputIndex, + TCollectStatsLevel statsLevel, + const THashMap& secureParams, + ui64 taskId, + const THashMap& taskParams, + const TVector& readRanges, + const NActors::TActorId& computeActorId, + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + const NKikimr::NMiniKQL::THolderFactory& holderFactory); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/generic/connector/libcpp/client.cpp b/ydb/library/yql/providers/generic/connector/libcpp/client.cpp index 88d4e2636f6a..f47d18f63075 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/client.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/client.cpp @@ -20,6 +20,14 @@ namespace NYql::NConnector { std::shared_ptr> Streamer_; }; + TListSplitsStreamIteratorDrainer::TPtr MakeListSplitsStreamIteratorDrainer(IListSplitsStreamIterator::TPtr&& iterator) { + return std::make_shared(std::move(iterator)); + } + + TReadSplitsStreamIteratorDrainer::TPtr MakeReadSplitsStreamIteratorDrainer(IReadSplitsStreamIterator::TPtr&& iterator) { + return std::make_shared(std::move(iterator)); + } + class TClientGRPC: public IClient { public: TClientGRPC() = delete; @@ -83,7 +91,7 @@ namespace NYql::NConnector { typename NYdbGrpc::TSimpleRequestProcessor::TAsyncRequest rpc, TDuration timeout = {}) { auto context = GrpcClient_->CreateContext(); if (!context) { - throw yexception() << "Client is being shutted down"; + throw yexception() << "Client is being shutdown"; } auto promise = NThreading::NewPromise>(); @@ -113,7 +121,7 @@ namespace NYql::NConnector { auto context = GrpcClient_->CreateContext(); if (!context) { - throw yexception() << "Client is being shutted down"; + throw yexception() << "Client is being shutdown"; } GrpcConnection_->DoStreamRequest( diff --git a/ydb/library/yql/providers/generic/connector/libcpp/client.h b/ydb/library/yql/providers/generic/connector/libcpp/client.h index 6f40344d4c9c..929e6a6051c5 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/client.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/client.h @@ -1,10 +1,13 @@ #pragma once -#include +#include "error.h" + #include -#include #include #include +#include +#include +#include namespace NYql::NConnector { template @@ -28,7 +31,7 @@ namespace NYql::NConnector { TStreamer(TStreamProcessorPtr streamProcessor) : StreamProcessor_(streamProcessor) , Finished_(false) - {}; + {} TAsyncResult ReadNext(std::shared_ptr self) { auto promise = NThreading::NewPromise>(); @@ -67,12 +70,84 @@ namespace NYql::NConnector { virtual TAsyncResult ReadNext() = 0; - virtual ~IStreamIterator(){}; + virtual ~IStreamIterator() {} }; using IListSplitsStreamIterator = IStreamIterator; using IReadSplitsStreamIterator = IStreamIterator; + template + class IStreamIteratorDrainer: public std::enable_shared_from_this> { + public: + using TPtr = std::shared_ptr>; + + struct TBuffer { + TVector Responses; + TIssues Issues; + }; + + IStreamIteratorDrainer(IStreamIterator::TPtr&& iterator) + : Iterator_(std::move(iterator)) + { + } + + NThreading::TFuture Run() { + auto promise = NThreading::NewPromise(); + Next(promise); + return promise.GetFuture(); + } + + virtual ~IStreamIteratorDrainer() { + } + + private: + IStreamIterator::TPtr Iterator_; + + // Transport issues and stream messages received during stream flushing are accumulated here + TVector Responses_; + TIssues Issues_; + + void Next(NThreading::TPromise promise) { + TPtr self = this->shared_from_this(); + + Iterator_->ReadNext().Subscribe([self, promise](const TAsyncResult& f1) mutable { + TAsyncResult f2(f1); + auto result = f2.ExtractValue(); + + // Check transport error + if (!result.Status.Ok()) { + // It could be either EOF (== success), or unexpected error + if (!GrpcStatusEndOfStream(result.Status)) { + self->Issues_.AddIssue(result.Status.ToDebugString()); + } + + promise.SetValue(TBuffer{std::move(self->Responses_), std::move(self->Issues_)}); + return; + } + + // Check logic error + if (!NConnector::IsSuccess(*result.Response)) { + self->Issues_.AddIssues(NConnector::ErrorToIssues(result.Response->error())); + promise.SetValue(TBuffer{std::move(self->Responses_), std::move(self->Issues_)}); + return; + } + + Y_ENSURE(result.Response); + + self->Responses_.push_back(std::move(*result.Response)); + self->Next(promise); + }); + } + }; + + using TListSplitsStreamIteratorDrainer = IStreamIteratorDrainer; + using TReadSplitsStreamIteratorDrainer = IStreamIteratorDrainer; + + TListSplitsStreamIteratorDrainer::TPtr + MakeListSplitsStreamIteratorDrainer(IListSplitsStreamIterator::TPtr&& iterator); + TReadSplitsStreamIteratorDrainer::TPtr + MakeReadSplitsStreamIteratorDrainer(IReadSplitsStreamIterator::TPtr&& iterator); + template struct TIteratorResult { NYdbGrpc::TGrpcStatus Status; @@ -89,9 +164,12 @@ namespace NYql::NConnector { public: using TPtr = std::shared_ptr; - virtual TDescribeTableAsyncResult DescribeTable(const NApi::TDescribeTableRequest& request, TDuration timeout = {}) = 0; - virtual TListSplitsStreamIteratorAsyncResult ListSplits(const NApi::TListSplitsRequest& request, TDuration timeout = {}) = 0; - virtual TReadSplitsStreamIteratorAsyncResult ReadSplits(const NApi::TReadSplitsRequest& request, TDuration timeout = {}) = 0; + virtual TDescribeTableAsyncResult DescribeTable(const NApi::TDescribeTableRequest& request, + TDuration timeout = {}) = 0; + virtual TListSplitsStreamIteratorAsyncResult ListSplits(const NApi::TListSplitsRequest& request, + TDuration timeout = {}) = 0; + virtual TReadSplitsStreamIteratorAsyncResult ReadSplits(const NApi::TReadSplitsRequest& request, + TDuration timeout = {}) = 0; virtual ~IClient() = default; }; diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h index 7e151179df2c..227643914a85 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h @@ -62,6 +62,8 @@ namespace NYql::NConnector::NTest { } MATCHER_P(ProtobufRequestMatcher, expected, "request does not match") { + Cerr << "CRAB Expected: " << expected.DebugString() << Endl; + Cerr << "CRAB Actual: " << arg.DebugString() << Endl; return google::protobuf::util::MessageDifferencer::Equals(arg, expected); } diff --git a/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json b/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json index aa52c14f5576..05a29a7df8de 100644 --- a/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json +++ b/ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.json @@ -29,8 +29,7 @@ "Base": "TCallable", "Match": {"Type": "Callable", "Name": "GenTable"}, "Children": [ - {"Index": 0, "Name": "Name", "Type": "TCoAtom"}, - {"Index": 1, "Name": "Splits", "Type": "TExprBase"} + {"Index": 0, "Name": "Name", "Type": "TCoAtom"} ] }, { diff --git a/ydb/library/yql/providers/generic/proto/partition.proto b/ydb/library/yql/providers/generic/proto/partition.proto new file mode 100644 index 000000000000..77f6aee08bbb --- /dev/null +++ b/ydb/library/yql/providers/generic/proto/partition.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; + +option cc_enable_arenas = true; + +package NYql.NGeneric; + +import "ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto"; + +message TPartition { + repeated NYql.NConnector.NApi.TSplit splits = 1; +} \ No newline at end of file diff --git a/ydb/library/yql/providers/generic/proto/range.proto b/ydb/library/yql/providers/generic/proto/range.proto deleted file mode 100644 index a28e4b45982b..000000000000 --- a/ydb/library/yql/providers/generic/proto/range.proto +++ /dev/null @@ -1,9 +0,0 @@ -syntax = "proto3"; - -package NYql.Generic; - -// FIXME: this is legacy left from the ClickHouse connector, -// it's not used directly now, but left for the further research. -message TRange { - string Range = 1; -} diff --git a/ydb/library/yql/providers/generic/proto/source.proto b/ydb/library/yql/providers/generic/proto/source.proto index a9a92d92e36b..7d4fcbec29f1 100644 --- a/ydb/library/yql/providers/generic/proto/source.proto +++ b/ydb/library/yql/providers/generic/proto/source.proto @@ -2,7 +2,7 @@ syntax = "proto3"; option cc_enable_arenas = true; -package NYql.Generic; +package NYql.NGeneric; import "ydb/library/yql/providers/generic/connector/api/service/protos/connector.proto"; import "yql/essentials/providers/common/proto/gateways_config.proto"; diff --git a/ydb/library/yql/providers/generic/proto/ya.make b/ydb/library/yql/providers/generic/proto/ya.make index 31880aadde41..d78a251edf8c 100644 --- a/ydb/library/yql/providers/generic/proto/ya.make +++ b/ydb/library/yql/providers/generic/proto/ya.make @@ -8,7 +8,7 @@ PEERDIR( ) SRCS( - range.proto + partition.proto source.proto ) diff --git a/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp b/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp index 841de470544e..82052e8659d7 100644 --- a/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp +++ b/ydb/library/yql/providers/generic/provider/ut/pushdown/pushdown_ut.cpp @@ -73,6 +73,29 @@ struct TFakeDatabaseResolver: public IDatabaseAsyncResolver { } }; + +class TListSplitsIteratorMock: public NConnector::IListSplitsStreamIterator { +public: + TListSplitsIteratorMock() {} + + NConnector::TAsyncResult ReadNext() override { + NConnector::TResult result; + + if (!Responded_) { + result.Status = NYdbGrpc::TGrpcStatus(); // OK + result.Response = NConnector::NApi::TListSplitsResponse(); + result.Response->add_splits(); + Responded_ = true; + } else { + result.Status = NYdbGrpc::TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF"); + } + + return NThreading::MakeFuture>(std::move(result)); + } +private: + bool Responded_ = false; +}; + struct TFakeGenericClient: public NConnector::IClient { NConnector::TDescribeTableAsyncResult DescribeTable(const NConnector::NApi::TDescribeTableRequest& request, TDuration) override { UNIT_ASSERT_VALUES_EQUAL(request.table(), "test_table"); @@ -125,11 +148,13 @@ struct TFakeGenericClient: public NConnector::IClient { NConnector::TListSplitsStreamIteratorAsyncResult ListSplits(const NConnector::NApi::TListSplitsRequest& request, TDuration) override { Y_UNUSED(request); - try { - throw std::runtime_error("ListSplits unimplemented"); - } catch (...) { - return NThreading::MakeErrorFuture(std::current_exception()); - } + + NConnector::TIteratorResult iteratorResult{ + NYdbGrpc::TGrpcStatus(), + std::make_shared(), + }; + + return NThreading::MakeFuture>(std::move(iteratorResult)); } NConnector::TReadSplitsStreamIteratorAsyncResult ReadSplits(const NConnector::NApi::TReadSplitsRequest& request, TDuration) override { @@ -144,7 +169,7 @@ struct TFakeGenericClient: public NConnector::IClient { class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase { public: - explicit TBuildDqSourceSettingsTransformer(TTypeAnnotationContext* types, Generic::TSource* dqSourceSettings, bool* dqSourceSettingsWereBuilt) + explicit TBuildDqSourceSettingsTransformer(TTypeAnnotationContext* types, NGeneric::TSource* dqSourceSettings, bool* dqSourceSettingsWereBuilt) : TOptimizeTransformerBase(types, NLog::EComponent::ProviderGeneric, {}) , DqSourceSettings_(dqSourceSettings) , DqSourceSettingsWereBuilt_(dqSourceSettingsWereBuilt) @@ -182,13 +207,13 @@ class TBuildDqSourceSettingsTransformer: public TOptimizeTransformerBase { TString sourceType; dqIntegration->FillSourceSettings(*dqSourceNode, settings, sourceType, 1, ctx); UNIT_ASSERT_STRINGS_EQUAL(sourceType, "PostgreSqlGeneric"); - UNIT_ASSERT(settings.Is()); + UNIT_ASSERT(settings.Is()); settings.UnpackTo(DqSourceSettings_); *DqSourceSettingsWereBuilt_ = true; } private: - Generic::TSource* DqSourceSettings_; + NGeneric::TSource* DqSourceSettings_; bool* DqSourceSettingsWereBuilt_; }; @@ -207,7 +232,7 @@ struct TPushdownFixture: public NUnitTest::TBaseFixture { TAutoPtr Transformer; TAutoPtr BuildDqSourceSettingsTransformer; - Generic::TSource DqSourceSettings; + NGeneric::TSource DqSourceSettings; bool DqSourceSettingsWereBuilt = false; TExprNode::TPtr InitialExprRoot; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp index eb273195a256..0ee66d3076f9 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_datasource_type_ann.cpp @@ -50,7 +50,7 @@ namespace NYql { } TStatus HandleTable(const TExprNode::TPtr& input, TExprContext& ctx) { - if (!EnsureArgsCount(*input, 2, ctx)) { + if (!EnsureArgsCount(*input, 1, ctx)) { return TStatus::Error; } @@ -59,6 +59,7 @@ namespace NYql { } input->SetTypeAnn(ctx.MakeType()); + return TStatus::Ok; } @@ -95,16 +96,18 @@ namespace NYql { columnSet.insert(child->Content()); } - auto [tableMeta, issue] = State_->GetTable(clusterName, tableName, ctx.GetPosition(input->Pos())); - if (issue.has_value()) { - ctx.AddError(issue.value()); + auto [tableMeta, issues] = State_->GetTable({clusterName, tableName}); + if (issues) { + for (const auto& issue : issues) { + ctx.AddError(issue); + } return TStatus::Error; } // Create type annotation TVector blockRowTypeItems; - const auto structExprType = tableMeta.value()->ItemType; + const auto structExprType = tableMeta->ItemType; for (const auto& item : structExprType->GetItems()) { // Filter out columns that are not required in this query if (columnSet.contains(item->GetName())) { @@ -186,14 +189,16 @@ namespace NYql { const auto tableName = table.Name().StringValue(); // Extract table metadata - auto [tableMeta, issue] = State_->GetTable(clusterName, tableName, ctx.GetPosition(input->Pos())); - if (issue.has_value()) { - ctx.AddError(issue.value()); + auto [tableMeta, issues] = State_->GetTable({clusterName, tableName}); + if (issues) { + for (const auto& issue : issues) { + ctx.AddError(issue); + } return TStatus::Error; } - auto itemType = tableMeta.value()->ItemType; - auto columnOrder = tableMeta.value()->ColumnOrder; + auto itemType = tableMeta->ItemType; + auto columnOrder = tableMeta->ColumnOrder; if (columnSet) { YQL_CLOG(INFO, ProviderGeneric) << "custom column set" << ColumnSetToString(*columnSet.Get()); diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp index bb5b042b6734..04a36eb7a241 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_dq_integration.cpp @@ -1,5 +1,4 @@ #include "yql_generic_dq_integration.h" - #include "yql_generic_mkql_compiler.h" #include "yql_generic_predicate_pushdown.h" @@ -7,8 +6,8 @@ #include #include #include -#include #include +#include #include #include #include @@ -40,7 +39,7 @@ namespace NYql { case NYql::EGenericDataSourceKind::LOGGING: return "LoggingGeneric"; default: - ythrow yexception() << "Data source kind is unknown or not specified"; + throw yexception() << "Data source kind is unknown or not specified"; } } @@ -106,13 +105,67 @@ namespace NYql { return read; } - ui64 Partition(const TExprNode&, TVector& partitions, TString*, TExprContext&, const TPartitionSettings&) override { + ui64 Partition( + const TExprNode& node, + TVector& partitions, + TString*, + TExprContext& ctx, + const TPartitionSettings& partitionSettings) override { + auto maybeDqSource = TMaybeNode(&node); + if (!maybeDqSource) { + return 0; + } + + auto srcSettings = maybeDqSource.Cast().Settings(); + auto maybeGenSourceSettings = TMaybeNode(srcSettings.Raw()); + Y_ENSURE(maybeGenSourceSettings); + auto genSourceSettings = maybeGenSourceSettings.Cast(); + + const TGenericState::TTableAddress tableAddress{ + genSourceSettings.Cluster().StringValue(), + genSourceSettings.Table().StringValue()}; + + // Extract table metadata from provider state>. + auto [tableMeta, issues] = State_->GetTable(tableAddress); + if (issues) { + for (const auto& issue : issues) { + ctx.AddError(issue); + } + + return 0; + } + + const size_t totalSplits = tableMeta->Splits.size(); + partitions.clear(); - Generic::TRange range; - partitions.emplace_back(); - TStringOutput out(partitions.back()); - range.Save(&out); - return 0ULL; + + if (totalSplits <= partitionSettings.MaxPartitions) { + // If there are not too many splits, simply make a single-split partitions. + for (size_t i = 0; i < totalSplits; i++) { + NGeneric::TPartition partition; + *partition.add_splits() = tableMeta->Splits[i]; + TString partitionStr; + YQL_ENSURE(partition.SerializeToString(&partitionStr), "Failed to serialize partition"); + partitions.emplace_back(std::move(partitionStr)); + } + } else { + // If the number of splits is greater than the partitions limit, + // we have to make split batches in each partition. + size_t splitsPerPartition = (totalSplits / partitionSettings.MaxPartitions - 1) + 1; + + for (size_t i = 0; i < totalSplits; i += splitsPerPartition) { + NGeneric::TPartition partition; + for (size_t j = i; j < i + splitsPerPartition && j < totalSplits; j++) { + *partition.add_splits() = tableMeta->Splits[j]; + } + TString partitionStr; + YQL_ENSURE(partition.SerializeToString(&partitionStr), "Failed to serialize partition"); + partitions.emplace_back(std::move(partitionStr)); + } + } + + // TODO: check what's the meaning of this value + return 0; } void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, @@ -121,29 +174,29 @@ namespace NYql { if (const auto maybeSettings = source.Settings().Maybe()) { const auto settings = maybeSettings.Cast(); const auto& clusterName = source.DataSource().Cast().Cluster().StringValue(); - const auto& table = settings.Table().StringValue(); + const auto& tableName = settings.Table().StringValue(); const auto& clusterConfig = State_->Configuration->ClusterNamesToClusterConfigs[clusterName]; const auto& endpoint = clusterConfig.endpoint(); - Generic::TSource source; + NGeneric::TSource source; YQL_CLOG(INFO, ProviderGeneric) << "Filling source settings" << ": cluster: " << clusterName - << ", table: " << table + << ", table: " << tableName << ", endpoint: " << endpoint.ShortDebugString(); const auto& columns = settings.Columns(); - auto [tableMeta, issue] = State_->GetTable(clusterName, table); - if (issue.has_value()) { - ythrow yexception() << "Get table metadata: " << issue.value(); + auto [tableMeta, issues] = State_->GetTable({clusterName, tableName}); + if (issues) { + throw yexception() << "Get table metadata: " << issues.ToOneLineString(); } // prepare select auto select = source.mutable_select(); - select->mutable_from()->set_table(TString(table)); - select->mutable_data_source_instance()->CopyFrom(tableMeta.value()->DataSourceInstance); + select->mutable_from()->set_table(TString(tableName)); + *select->mutable_data_source_instance() = tableMeta->DataSourceInstance; auto items = select->mutable_what()->mutable_items(); for (size_t i = 0; i < columns.Size(); i++) { @@ -153,14 +206,14 @@ namespace NYql { column->mutable_name()->assign(columnName); // assign column type - auto type = NConnector::GetColumnTypeByName(tableMeta.value()->Schema, columnName); - column->mutable_type()->CopyFrom(type); + auto type = NConnector::GetColumnTypeByName(tableMeta->Schema, columnName); + *column->mutable_type() = type; } if (auto predicate = settings.FilterPredicate(); !IsEmptyFilterPredicate(predicate)) { TStringBuilder err; if (!SerializeFilterPredicate(predicate, select->mutable_where()->mutable_filter_typed(), err)) { - ythrow yexception() << "Failed to serialize filter predicate for source: " << err; + throw yexception() << "Failed to serialize filter predicate for source: " << err; } } @@ -194,11 +247,11 @@ namespace NYql { const TGenSourceSettings settings = source.Settings().Cast(); const TString& clusterName = source.DataSource().Cast().Cluster().StringValue(); - const TString& table = settings.Table().StringValue(); - properties["Table"] = table; - auto [tableMeta, issue] = State_->GetTable(clusterName, table); + const TString& tableName = settings.Table().StringValue(); + properties["Table"] = tableName; + auto [tableMeta, issue] = State_->GetTable({clusterName, tableName}); if (!issue) { - const NYql::TGenericDataSourceInstance& dataSourceInstance = tableMeta.value()->DataSourceInstance; + const NYql::TGenericDataSourceInstance& dataSourceInstance = tableMeta->DataSourceInstance; switch (dataSourceInstance.kind()) { case NYql::EGenericDataSourceKind::CLICKHOUSE: properties["SourceType"] = "ClickHouse"; @@ -269,24 +322,24 @@ namespace NYql { const auto settings = wrap.Input().Cast(); const auto& clusterName = wrap.DataSource().Cast().Cluster().StringValue(); - const auto& table = settings.Table().StringValue(); + const auto& tableName = settings.Table().StringValue(); const auto& clusterConfig = State_->Configuration->ClusterNamesToClusterConfigs[clusterName]; const auto& endpoint = clusterConfig.endpoint(); YQL_CLOG(INFO, ProviderGeneric) << "Filling lookup source settings" << ": cluster: " << clusterName - << ", table: " << table + << ", table: " << tableName << ", endpoint: " << endpoint.ShortDebugString(); - auto [tableMeta, issue] = State_->GetTable(clusterName, table); - if (issue.has_value()) { - ythrow yexception() << "Get table metadata: " << issue.value(); + auto [tableMeta, issues] = State_->GetTable({clusterName, tableName}); + if (issues) { + throw yexception() << "Get table metadata: " << issues.ToOneLineString(); } - Generic::TLookupSource source; - source.set_table(table); - *source.mutable_data_source_instance() = tableMeta.value()->DataSourceInstance; + NGeneric::TLookupSource source; + source.set_table(tableName); + *source.mutable_data_source_instance() = tableMeta->DataSourceInstance; // Managed YDB supports access via IAM token. // If exist, copy service account creds to obtain tokens during request execution phase. diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp index 1628db86aca0..a723d615ccc4 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp @@ -1,3 +1,4 @@ +// clang-format off #include "yql_generic_provider_impl.h" #include @@ -25,16 +26,20 @@ namespace NYql { using namespace NKikimr; using namespace NKikimr::NMiniKQL; - class TGenericLoadTableMetadataTransformer: public TGraphTransformerBase { struct TTableDescription { using TPtr = std::shared_ptr; NYql::TGenericDataSourceInstance DataSourceInstance; - std::optional Response; + + std::optional Schema; + std::vector Splits; + + // Issues that could occure at any phase of network interaction with Connector + TIssues Issues; }; - using TDescribeTableMap = + using TTableDescriptionMap = std::unordered_map>; public: @@ -50,13 +55,14 @@ namespace NYql { return TStatus::Ok; } - std::unordered_set pendingTables; + std::unordered_set pendingTables; const auto& reads = FindNodes(input, [&](const TExprNode::TPtr& node) { if (const auto maybeRead = TMaybeNode(node)) { return maybeRead.Cast().DataSource().Category().Value() == GenericProviderName; } return false; }); + if (!reads.empty()) { for (const auto& r : reads) { const TGenRead read(r); @@ -88,39 +94,10 @@ namespace NYql { std::vector> handles; handles.reserve(pendingTables.size()); - Results_.reserve(pendingTables.size()); - - for (const auto& item : pendingTables) { - const auto& clusterName = item.first; - const auto it = State_->Configuration->ClusterNamesToClusterConfigs.find(clusterName); - YQL_ENSURE(State_->Configuration->ClusterNamesToClusterConfigs.cend() != it, "cluster not found: " << clusterName); - - NConnector::NApi::TDescribeTableRequest request; - FillDescribeTableRequest(request, it->second, item.second); - - auto promise = NThreading::NewPromise(); - handles.emplace_back(promise.GetFuture()); - - // preserve data source instance for the further usage - auto emplaceIt = Results_.emplace(std::make_pair(item, std::make_shared())); - auto desc = emplaceIt.first->second; - desc->DataSourceInstance = request.data_source_instance(); - - Y_ENSURE(State_->GenericClient); - State_->GenericClient->DescribeTable(request).Subscribe( - [desc = std::move(desc), promise = std::move(promise)](const NConnector::TDescribeTableAsyncResult& f1) mutable { - NConnector::TDescribeTableAsyncResult f2(f1); - auto result = f2.ExtractValueSync(); - - // Check only transport errors; - // logic errors will be checked later in DoApplyAsyncChanges - if (result.Status.Ok()) { - desc->Response = std::move(result.Response); - promise.SetValue(); - } else { - promise.SetException(result.Status.ToDebugString()); - } - }); + TableDescriptions_.reserve(pendingTables.size()); + + for (const auto& tableAddress : pendingTables) { + LoadTableMetadataFromConnector(tableAddress, handles); } if (handles.empty()) { @@ -130,11 +107,124 @@ namespace NYql { AsyncFuture_ = NThreading::WaitExceptionOrAll(handles); return TStatus::Async; } + // clang-format on + + private: + void LoadTableMetadataFromConnector(const TGenericState::TTableAddress& tableAddress, + std::vector>& handles) { + const auto it = State_->Configuration->ClusterNamesToClusterConfigs.find(tableAddress.ClusterName); + YQL_ENSURE(State_->Configuration->ClusterNamesToClusterConfigs.cend() != it, + "cluster not found: " << tableAddress.ClusterName); + + NConnector::NApi::TDescribeTableRequest request; + FillDescribeTableRequest(request, it->second, tableAddress.TableName); + + auto promise = NThreading::NewPromise(); + handles.emplace_back(promise.GetFuture()); + + // preserve data source instance for the further usage + auto emplaceIt = + TableDescriptions_.emplace(tableAddress, std::make_shared()); + auto desc = emplaceIt.first->second; + desc->DataSourceInstance = request.data_source_instance(); + + Y_ENSURE(State_->GenericClient); + + State_->GenericClient->DescribeTable(request).Subscribe( + [desc, tableAddress, promise, + client = State_->GenericClient](const NConnector::TDescribeTableAsyncResult& f1) mutable { + NConnector::TDescribeTableAsyncResult f2(f1); + auto result = f2.ExtractValueSync(); + + // Check transport error + if (!result.Status.Ok()) { + desc->Issues.AddIssue(TStringBuilder() + << "Call DescribeTable for table " << tableAddress.ToString() << ": " + << result.Status.ToDebugString()); + promise.SetValue(); + return; + } + + // Check logical error + if (!NConnector::IsSuccess(*result.Response)) { + desc->Issues.AddIssues(NConnector::ErrorToIssues( + result.Response->error(), + TStringBuilder() << "Call DescribeTable for table " << tableAddress.ToString() << ": ")); + promise.SetValue(); + return; + } + + // Preserve schema for the further usage + desc->Schema = result.Response->schema(); + + // Call ListSplits + NConnector::NApi::TListSplitsRequest request; + auto select = request.add_selects(); + *select->mutable_data_source_instance() = desc->DataSourceInstance; + select->mutable_from()->set_table(tableAddress.TableName); + + client->ListSplits(request).Subscribe( + [desc, promise, + tableAddress](const NConnector::TListSplitsStreamIteratorAsyncResult f3) mutable { + NConnector::TListSplitsStreamIteratorAsyncResult f4(f3); + auto streamIterResult = f4.ExtractValueSync(); + + // Check transport error + if (!streamIterResult.Status.Ok()) { + desc->Issues.AddIssue(TStringBuilder() + << "Call ListSplits for table " << tableAddress.ToString() << ": " + << streamIterResult.Status.ToDebugString()); + promise.SetValue(); + return; + } + + Y_ENSURE(streamIterResult.Iterator); + + auto drainer = + NConnector::MakeListSplitsStreamIteratorDrainer(std::move(streamIterResult.Iterator)); + + drainer->Run().Subscribe([desc, + promise, + tableAddress, + drainer // pass drainer to the callback because we want him to + // stay alive until the callback is called + ](const NThreading::TFuture& + f5) mutable { + NThreading::TFuture f6(f5); + auto drainerResult = f6.ExtractValueSync(); + + // check transport and logical errors + if (drainerResult.Issues) { + TIssue dstIssue(TStringBuilder() << "Call ListSplits for table " << tableAddress.ToString()); + for (const auto& srcIssue : drainerResult.Issues) { + dstIssue.AddSubIssue(MakeIntrusive(srcIssue)); + }; + desc->Issues.AddIssue(std::move(dstIssue)); + promise.SetValue(); + return; + } + + // collect all the splits from every response into a single vector + for (auto&& response : drainerResult.Responses) { + std::transform(std::make_move_iterator(response.mutable_splits()->begin()), + std::make_move_iterator(response.mutable_splits()->end()), + std::back_inserter(desc->Splits), + [](auto&& split) { return std::move(split); }); + } + + promise.SetValue(); + }); + }); + }); + } + public: NThreading::TFuture DoGetAsyncFuture(const TExprNode&) final { return AsyncFuture_; } + // TODO: for some reason engine calls this function more than once. + // It worth adding some checks to avoid multiple data copying. TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { AsyncFuture_.GetValue(); @@ -147,76 +237,70 @@ namespace NYql { TNodeOnNodeOwnedMap replaces(reads.size()); + // Iterate over all the requested tables, check Connector responses for (const auto& r : reads) { - TIssues issues = HandleDescribeTableResponse(r, ctx, replaces); - if (issues) { - for (const auto& issue : issues) { - ctx.AddError(issue); - } + const TGenRead genRead(r); + const auto clusterName = genRead.DataSource().Cluster().StringValue(); + const auto& keyArg = TExprBase(genRead.FreeArgs().Get(2).Ref().HeadPtr()).Cast().Ref().Head(); + const auto tableName = TString(keyArg.Tail().Head().Content()); + const TGenericState::TTableAddress tableAddress{clusterName, tableName}; + + // Find appropriate response + auto iter = TableDescriptions_.find(tableAddress); + if (iter == TableDescriptions_.end()) { + ctx.AddError(TIssue(ctx.GetPosition(genRead.Pos()), TStringBuilder() + << "Connector response not found for table " + << tableAddress.ToString())); return TStatus::Error; } - } - return RemapExpr(input, output, replaces, ctx, TOptimizeExprSettings(nullptr)); - } - - void Rewind() final { - Results_.clear(); - AsyncFuture_ = {}; - } + auto& result = iter->second; - private: - TIssues HandleDescribeTableResponse( - const TIntrusivePtr& read, - TExprContext& ctx, - TNodeOnNodeOwnedMap& replaces - ) { - const TGenRead genRead(read); - const auto clusterName = genRead.DataSource().Cluster().StringValue(); - const auto& keyArg = TExprBase(genRead.FreeArgs().Get(2).Ref().HeadPtr()).Cast().Ref().Head(); - const auto tableName = TString(keyArg.Tail().Head().Content()); + // If errors occured during network interaction with Connector, return them + if (result->Issues) { + for (const auto& issue : result->Issues) { + ctx.AddError(issue); + } - const auto it = Results_.find(TGenericState::TTableAddress(clusterName, tableName)); + return TStatus::Error; + } - if (it == Results_.cend()) { - TIssues issues; - issues.AddIssue(TIssue(ctx.GetPosition(genRead.Pos()), TStringBuilder() - << "Not found result for " << clusterName << '.' << tableName)); - return issues; - } + Y_ENSURE(result->Schema); + Y_ENSURE(result->Splits.size() > 0); - const auto& response = it->second->Response; + TGenericState::TTableMeta tableMeta; + tableMeta.Schema = *result->Schema; + tableMeta.DataSourceInstance = result->DataSourceInstance; + tableMeta.Splits = result->Splits; - if (!NConnector::IsSuccess(*response)) { - return NConnector::ErrorToIssues( - response->error(), - TStringBuilder() << "Loading metadata for table: " << clusterName << '.' << tableName - ); - } + // Parse table schema + ParseTableMeta(ctx, ctx.GetPosition(genRead.Pos()), tableAddress, tableMeta); - TGenericState::TTableMeta tableMeta; - tableMeta.Schema = response->schema(); - tableMeta.DataSourceInstance = it->second->DataSourceInstance; + // Fill AST for a table + if (const auto ins = replaces.emplace(genRead.Raw(), TExprNode::TPtr()); ins.second) { + ins.first->second = MakeTableMetaNode(ctx, genRead, tableName); + } - auto issues = ParseTableMeta(ctx, ctx.GetPosition(read->Pos()), clusterName, tableName, tableMeta); - if (issues) { - return issues; + // Save table metadata into provider state + State_->AddTable(tableAddress, std::move(tableMeta)); } - if (const auto ins = replaces.emplace(genRead.Raw(), TExprNode::TPtr()); ins.second) { - ins.first->second = MakeTableMetaNode(ctx, genRead, tableName); - } + return RemapExpr(input, output, replaces, ctx, TOptimizeExprSettings(nullptr)); + } - State_->AddTable(clusterName, tableName, std::move(tableMeta)); - return TIssues{}; + // clang-format off + + void Rewind() final { + TableDescriptions_.clear(); + AsyncFuture_ = {}; } + private: TIssues ParseTableMeta( TExprContext& ctx, const TPosition& pos, - const std::string_view& cluster, - const std::string_view& table, + const TGenericState::TTableAddress& tableAddress, TGenericState::TTableMeta& tableMeta ) try { TVector items; @@ -224,7 +308,7 @@ namespace NYql { const auto& columns = tableMeta.Schema.columns(); if (columns.empty()) { TIssues issues; - issues.AddIssue(TIssue(pos, TStringBuilder() << "Table " << cluster << '.' << table << " doesn't exist.")); + issues.AddIssue(TIssue(pos, TStringBuilder() << "Table " << tableAddress.ToString() << " doesn't exist.")); return issues; } @@ -249,7 +333,7 @@ namespace NYql { TExprNode::TPtr MakeTableMetaNode( TExprContext& ctx, const TGenRead& read, - const TString& tableName + const TString& tableName ) { // clang-format off auto row = Build(ctx, read.Pos()) @@ -265,7 +349,7 @@ namespace NYql { auto table = Build(ctx, read.Pos()) .Name().Value(tableName).Build() - .Splits().Build().Done(); + .Done(); return Build(ctx, read.Pos()) .World(read.World()) @@ -277,8 +361,8 @@ namespace NYql { // clang-format on } - void FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig, - const TString& tablePath) { + void FillDescribeTableRequest(NConnector::NApi::TDescribeTableRequest& request, + const TGenericClusterConfig& clusterConfig, const TString& tablePath) { const auto dataSourceKind = clusterConfig.GetKind(); auto dsi = request.mutable_data_source_instance(); *dsi->mutable_endpoint() = clusterConfig.GetEndpoint(); @@ -291,7 +375,8 @@ namespace NYql { FillTablePath(request, clusterConfig, tablePath); } - void FillCredentials(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig) { + void FillCredentials(NConnector::NApi::TDescribeTableRequest& request, + const TGenericClusterConfig& clusterConfig) { auto dsi = request.mutable_data_source_instance(); // If login/password is provided, just copy them into request: @@ -305,9 +390,7 @@ namespace NYql { // 1. Client provided own IAM-token to access external data source auto iamToken = State_->Types->Credentials->FindCredentialContent( - "default_" + clusterConfig.name(), - "default_generic", - clusterConfig.GetToken()); + "default_" + clusterConfig.name(), "default_generic", clusterConfig.GetToken()); if (iamToken) { *dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken; *dsi->mutable_credentials()->mutable_token()->mutable_type() = "IAM"; @@ -317,8 +400,8 @@ namespace NYql { // 2. Client provided service account creds that must be converted into IAM-token Y_ENSURE(State_->CredentialsFactory, "CredentialsFactory is not initialized"); - auto structuredTokenJSON = TStructuredTokenBuilder().SetServiceAccountIdAuth( - clusterConfig.GetServiceAccountId(), + auto structuredTokenJSON = TStructuredTokenBuilder() + .SetServiceAccountIdAuth(clusterConfig.GetServiceAccountId(), clusterConfig.GetServiceAccountIdSignature()) .ToJson(); @@ -329,13 +412,12 @@ namespace NYql { auto providersIt = State_->CredentialProviders.find(clusterConfig.name()); if (providersIt == State_->CredentialProviders.end()) { auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken( - State_->CredentialsFactory, - structuredTokenJSON, - false); + State_->CredentialsFactory, structuredTokenJSON, false); - providersIt = State_->CredentialProviders.emplace( - std::make_pair(clusterConfig.name(), credentialsProviderFactory->CreateProvider())) - .first; + providersIt = + State_->CredentialProviders + .emplace(clusterConfig.name(), credentialsProviderFactory->CreateProvider()) + .first; } iamToken = providersIt->second->GetAuthInfo(); @@ -373,7 +455,8 @@ namespace NYql { } } - void FillDataSourceOptions(NConnector::NApi::TDescribeTableRequest& request, const TGenericClusterConfig& clusterConfig) { + void FillDataSourceOptions(NConnector::NApi::TDescribeTableRequest& request, + const TGenericClusterConfig& clusterConfig) { const auto dataSourceKind = clusterConfig.GetKind(); switch (dataSourceKind) { case NYql::EGenericDataSourceKind::CLICKHOUSE: @@ -401,8 +484,8 @@ namespace NYql { SetLoggingFolderId(*options, clusterConfig); } break; default: - ythrow yexception() << "Unexpected data source kind: '" << NYql::EGenericDataSourceKind_Name(dataSourceKind) - << "'"; + throw yexception() << "Unexpected data source kind: '" + << NYql::EGenericDataSourceKind_Name(dataSourceKind) << "'"; } } @@ -414,7 +497,7 @@ namespace NYql { } else if (dateTimeFormat == "YQL") { request.mutable_type_mapping_settings()->set_date_time_format(NConnector::NApi::YQL_FORMAT); } else { - ythrow yexception() << "Unexpected date/time format: '" << dateTimeFormat << "'"; + throw yexception() << "Unexpected date/time format: '" << dateTimeFormat << "'"; } } @@ -427,7 +510,8 @@ namespace NYql { private: const TGenericState::TPtr State_; - TDescribeTableMap Results_; + TTableDescriptionMap TableDescriptions_; + NThreading::TFuture AsyncFuture_; }; diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp index cce7d7dd5f6f..dcfcb9151570 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_physical_opt.cpp @@ -70,16 +70,20 @@ namespace NYql { const auto& read = maybe.Cast(); // Get table metadata - const auto [tableMeta, issue] = State_->GetTable( - read.DataSource().Cluster().Value(), - read.Table().Name().Value(), - ctx.GetPosition(node.Pos())); - if (issue.has_value()) { - ctx.AddError(issue.value()); + const auto [tableMeta, issues] = State_->GetTable( + TGenericState::TTableAddress( + TString(read.DataSource().Cluster().Value()), + TString(read.Table().Name().Value()) + ) + ); + if (issues) { + for (const auto& issue : issues) { + ctx.AddError(issue); + } return node; } - const auto structType = tableMeta.value()->ItemType; + const auto structType = tableMeta->ItemType; YQL_ENSURE(structType->GetSize()); auto columns = ctx.NewList(read.Pos(), {ctx.NewAtom(read.Pos(), GetLightColumn(*structType)->GetName())}); diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp index f5536d99ebf5..61e86bf982cb 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp +++ b/ydb/library/yql/providers/generic/provider/yql_generic_state.cpp @@ -1,28 +1,20 @@ #include "yql_generic_state.h" namespace NYql { - void TGenericState::AddTable(const TStringBuf& clusterName, const TStringBuf& tableName, TTableMeta&& tableMeta) { - Tables_.emplace(TTableAddress(clusterName, tableName), tableMeta); + void TGenericState::AddTable(const TTableAddress& tableAddress, TTableMeta&& tableMeta) { + Tables_.emplace(tableAddress, std::move(tableMeta)); } - TGenericState::TGetTableResult TGenericState::GetTable(const TStringBuf& clusterName, const TStringBuf& tableName) const { - auto result = Tables_.FindPtr(TTableAddress(clusterName, tableName)); + TGenericState::TGetTableResult TGenericState::GetTable(const TTableAddress& tableAddress) const { + auto result = Tables_.FindPtr(tableAddress); if (result) { - return std::make_pair(result, std::nullopt); + return std::make_pair(result, TIssues{}); } - return std::make_pair( - std::nullopt, - TIssue(TStringBuilder() << "no metadata for table " << clusterName << "." << tableName)); - }; + TIssues issues; + issues.AddIssue(TIssue(TStringBuilder() << "no metadata for table " << tableAddress.ToString())); - TGenericState::TGetTableResult TGenericState::GetTable(const TStringBuf& clusterName, const TStringBuf& tableName, const TPosition& position) const { - auto pair = TGenericState::GetTable(clusterName, tableName); - if (pair.second.has_value()) { - pair.second->Position = position; - } - - return pair; + return std::make_pair(nullptr, std::move(issues)); } } // namespace NYql diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_state.h b/ydb/library/yql/providers/generic/provider/yql_generic_state.h index c1bab527a497..c981b31e637d 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_state.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_state.h @@ -15,16 +15,36 @@ namespace NYql { struct TGenericState: public TThrRefBase { using TPtr = TIntrusivePtr; - using TTableAddress = std::pair; // std::pair + struct TTableAddress { + TString ClusterName; + TString TableName; + + TString ToString() const { + return TStringBuilder() << "`" << ClusterName << "`.`" << TableName << "`"; + } + + bool operator==(const TTableAddress& other) const { + return ClusterName == other.ClusterName && TableName == other.TableName; + } + + explicit operator size_t() const { + return CombineHashes(std::hash()(ClusterName), std::hash()(TableName)); + } + }; struct TTableMeta { const TStructExprType* ItemType = nullptr; + // TODO: check why is it important TVector ColumnOrder; - NYql::NConnector::NApi::TSchema Schema; + // External datasource description NYql::TGenericDataSourceInstance DataSourceInstance; + // External table schema + NYql::NConnector::NApi::TSchema Schema; + // Contains some binary description of table splits (partitions) produced by Connector + std::vector Splits; }; - using TGetTableResult = std::pair, std::optional>; + using TGetTableResult = std::pair; TGenericState() = delete; @@ -45,9 +65,8 @@ namespace NYql { Configuration->Init(gatewayConfig, databaseResolver, DatabaseAuth, types->Credentials); } - void AddTable(const TStringBuf& clusterName, const TStringBuf& tableName, TTableMeta&& tableMeta); - TGetTableResult GetTable(const TStringBuf& clusterName, const TStringBuf& tableName) const; - TGetTableResult GetTable(const TStringBuf& clusterName, const TStringBuf& tableName, const TPosition& position) const; + void AddTable(const TTableAddress& tableAddress, TTableMeta&& tableMeta); + TGetTableResult GetTable(const TTableAddress& tableAddress) const; TTypeAnnotationContext* Types; TGenericConfiguration::TPtr Configuration = MakeIntrusive(); diff --git a/ydb/tests/fq/generic/analytics/test_join.py b/ydb/tests/fq/generic/analytics/test_join.py index 336a6cd673fd..d414097d568b 100644 --- a/ydb/tests/fq/generic/analytics/test_join.py +++ b/ydb/tests/fq/generic/analytics/test_join.py @@ -24,8 +24,7 @@ class TestJoinAnalytics: "mvp_external_ydb_endpoint", [{"endpoint": "tests-fq-generic-analytics-ydb:2136"}], indirect=True ) @pytest.mark.parametrize("fq_client", [{"folder_id": "my_folder"}], indirect=True) - @pytest.mark.parametrize("query_type", [fq.QueryContent.QueryType.ANALYTICS, fq.QueryContent.QueryType.STREAMING]) - def test_simple(self, fq_client: FederatedQueryClient, settings: Settings, query_type): + def test_simple(self, fq_client: FederatedQueryClient, settings: Settings): table_name = "join_table" ch_conn_name = f"ch_conn_{table_name}" gp_conn_name = f"gp_conn_{table_name}" @@ -76,7 +75,7 @@ def test_simple(self, fq_client: FederatedQueryClient, settings: Settings, query ORDER BY data_pg; """ - query_id = fq_client.create_query(query_name, sql, type=query_type).result.query_id + query_id = fq_client.create_query(query_name, sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id fq_client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) data = fq_client.get_result_data(query_id)