diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp index 80790ccb9f7e..f85dc9733c15 100644 --- a/ydb/core/fq/libs/actors/run_actor.cpp +++ b/ydb/core/fq/libs/actors/run_actor.cpp @@ -781,7 +781,11 @@ class TRunActor : public NActors::TActorBootstrapped { mkqlDefaultLimit = 8_GB; } + // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig auto s3ReadDefaultInflightLimit = Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetDataInflight(); + if (s3ReadDefaultInflightLimit == 0) { + s3ReadDefaultInflightLimit = Params.Config.GetGateways().GetS3().GetDataInflight(); + } if (s3ReadDefaultInflightLimit == 0) { s3ReadDefaultInflightLimit = 200_MB; } @@ -1944,7 +1948,7 @@ class TRunActor : public NActors::TActorBootstrapped { { dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory, - Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles())); + Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig } { diff --git a/ydb/core/fq/libs/common/cache_ut.cpp b/ydb/core/fq/libs/common/cache_ut.cpp index 892fafef1ab8..8b02009eae3f 100644 --- a/ydb/core/fq/libs/common/cache_ut.cpp +++ b/ydb/core/fq/libs/common/cache_ut.cpp @@ -2,7 +2,7 @@ #include -using namespace NFq; +namespace NFq { using TCache = TTtlCache; @@ -66,3 +66,5 @@ Y_UNIT_TEST_SUITE(Cache) { UNIT_ASSERT_VALUES_EQUAL(cache.Size(), 0); } } + +} // namespace NFq diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index 60d61281212f..b4dfcdda51f6 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -196,7 +196,9 @@ void Init( if (protoConfig.GetPrivateApi().GetEnabled()) { const auto& s3readConfig = protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig(); auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.MaxTime = TDuration::Max(), .RetriedCurlCodes = NYql::FqRetriedCurlCodes()}); - NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg; + NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg = NYql::NDq::CreateReadActorFactoryConfig(protoConfig.GetGateways().GetS3()); + + // These fillings were left for the backward compatibility. TODO: remove this part after migration to TS3GatewayConfig if (const ui64 rowsInBatch = s3readConfig.GetRowsInBatch()) { readActorFactoryCfg.RowsInBatch = rowsInBatch; } @@ -206,22 +208,8 @@ void Init( if (const ui64 dataInflight = s3readConfig.GetDataInflight()) { readActorFactoryCfg.DataInflight = dataInflight; } - for (auto& formatSizeLimit: protoConfig.GetGateways().GetS3().GetFormatSizeLimit()) { - if (formatSizeLimit.GetName()) { // ignore unnamed limits - readActorFactoryCfg.FormatSizeLimits.emplace( - formatSizeLimit.GetName(), formatSizeLimit.GetFileSizeLimit()); - } - } - if (protoConfig.GetGateways().GetS3().HasFileSizeLimit()) { - readActorFactoryCfg.FileSizeLimit = - protoConfig.GetGateways().GetS3().GetFileSizeLimit(); - } - if (protoConfig.GetGateways().GetS3().HasBlockFileSizeLimit()) { - readActorFactoryCfg.BlockFileSizeLimit = - protoConfig.GetGateways().GetS3().GetBlockFileSizeLimit(); - } - RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode()); + RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, yqCounters->GetSubgroup("subsystem", "DqSourceTracker"), !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode()); RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg, yqCounters->GetSubgroup("subsystem", "S3ReadActor")); diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index b8af3f80b7d2..51502f75f432 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -73,7 +73,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory( if (federatedQuerySetup) { auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()}); - RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy); + RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy, federatedQuerySetup->S3ReadActorFactoryConfig); RegisterS3WriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy); if (federatedQuerySetup->ConnectorClient) { diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp index aa6f8b3ac855..0970763e4bb1 100644 --- a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp +++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp @@ -53,6 +53,8 @@ namespace NKikimr::NKqp { S3GatewayConfig = queryServiceConfig.GetS3(); + S3ReadActorFactoryConfig = NYql::NDq::CreateReadActorFactoryConfig(S3GatewayConfig); + // Initialize Token Accessor if (appConfig.GetAuthConfig().HasTokenAccessorConfig()) { const auto& tokenAccessorConfig = appConfig.GetAuthConfig().GetTokenAccessorConfig(); @@ -102,7 +104,8 @@ namespace NKikimr::NKqp { CredentialsFactory, nullptr, S3GatewayConfig, - GenericGatewaysConfig}; + GenericGatewaysConfig, + S3ReadActorFactoryConfig}; // Init DatabaseAsyncResolver only if all requirements are met if (DatabaseResolverActorId && MdbEndpointGenerator && diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h index 2db1e5a9f725..ad051cdcdc73 100644 --- a/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h +++ b/ydb/core/kqp/federated_query/kqp_federated_query_helpers.h @@ -8,6 +8,7 @@ #include #include #include +#include namespace NKikimr::NKqp { @@ -18,6 +19,7 @@ namespace NKikimr::NKqp { NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResolver; NYql::TS3GatewayConfig S3GatewayConfig; NYql::TGenericGatewayConfig GenericGatewayConfig; + NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig; }; struct IKqpFederatedQuerySetupFactory { @@ -51,6 +53,7 @@ namespace NKikimr::NKqp { NYql::NConnector::IClient::TPtr ConnectorClient; std::optional DatabaseResolverActorId; NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator; + NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig; }; struct TKqpFederatedQuerySetupFactoryMock: public IKqpFederatedQuerySetupFactory { @@ -74,7 +77,7 @@ namespace NKikimr::NKqp { std::optional Make(NActors::TActorSystem*) override { return TKqpFederatedQuerySetup{ - HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig}; + HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig, S3ReadActorFactoryConfig}; } private: @@ -84,6 +87,7 @@ namespace NKikimr::NKqp { NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResolver; NYql::TS3GatewayConfig S3GatewayConfig; NYql::TGenericGatewayConfig GenericGatewayConfig; + NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig; }; IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory( diff --git a/ydb/core/kqp/node_service/kqp_node_ut.cpp b/ydb/core/kqp/node_service/kqp_node_ut.cpp index fdcca8f068c4..b0040e603770 100644 --- a/ydb/core/kqp/node_service/kqp_node_ut.cpp +++ b/ydb/core/kqp/node_service/kqp_node_ut.cpp @@ -184,7 +184,7 @@ class KqpNode : public TTestBase { Runtime->EnableScheduleForActor(ResourceManagerActorId, true); WaitForBootstrap(); - auto FederatedQuerySetup = std::make_optional({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}}); + auto FederatedQuerySetup = std::make_optional({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}}); auto asyncIoFactory = CreateKqpAsyncIoFactory(KqpCounters, FederatedQuerySetup); auto kqpNode = CreateKqpNodeService(config, KqpCounters, CompFactory.Get(), asyncIoFactory); KqpNodeActorId = Runtime->Register(kqpNode); diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 7e0fbc7c16c9..71b6d3ff747a 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -53,7 +53,7 @@ TIntrusivePtr CreateKikimrQueryProcessor(TIntrusivePtr ga UNIT_ASSERT(TryParseFromTextFormat(defaultSettingsStream, defaultSettings)); kikimrConfig->Init(defaultSettings.GetDefaultSettings(), cluster, settings, true); - auto federatedQuerySetup = std::make_optional({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}}); + auto federatedQuerySetup = std::make_optional({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}}); return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver, federatedQuerySetup, nullptr, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem); } diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index 1838bdd39710..f01dadec1282 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -398,6 +398,10 @@ message TS3GatewayConfig { optional uint64 RegexpCacheSize = 14; optional uint64 GeneratorPathsLimit = 15; optional uint64 MaxListingResultSizePerPartition = 16; + optional uint64 RowsInBatch = 17; // Default = 1000 + optional uint64 MaxInflight = 18; // Default = 20 + optional uint64 DataInflight = 19; // Default = 200 MB + optional bool AllowLocalFiles = 20; repeated TAttr DefaultSettings = 100; } diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index bb3bd00d4388..3cb92c22e210 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -89,6 +89,30 @@ struct TEvPrivate { } // namespace class TDqPqReadActor : public NActors::TActor, public IDqComputeActorAsyncInput { + struct TMetrics { + TMetrics(const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters) + : TxId(std::visit([](auto arg) { return ToString(arg); }, txId)) + , Counters(counters) { + SubGroup = Counters->GetSubgroup("sink", "PqRead"); + auto sink = SubGroup->GetSubgroup("tx_id", TxId); + auto task = sink->GetSubgroup("task_id", ToString(taskId)); + InFlyAsyncInputData = task->GetCounter("InFlyAsyncInputData"); + InFlySubscribe = task->GetCounter("InFlySubscribe"); + AsyncInputDataRate = task->GetCounter("AsyncInputDataRate", true); + } + + ~TMetrics() { + SubGroup->RemoveSubgroup("id", TxId); + } + + TString TxId; + ::NMonitoring::TDynamicCounterPtr Counters; + ::NMonitoring::TDynamicCounterPtr SubGroup; + ::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData; + ::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe; + ::NMonitoring::TDynamicCounters::TCounterPtr AsyncInputDataRate; + }; + public: using TPartitionKey = std::pair; // Cluster, partition id. @@ -104,10 +128,12 @@ class TDqPqReadActor : public NActors::TActor, public IDqCompute std::shared_ptr credentialsProviderFactory, const NActors::TActorId& computeActorId, i64 bufferSize, + const ::NMonitoring::TDynamicCounterPtr& counters, bool rangesMode) : TActor(&TDqPqReadActor::StateFunc) , InputIndex(inputIndex) , TxId(txId) + , Metrics(txId, taskId, counters) , BufferSize(bufferSize) , RangesMode(rangesMode) , HolderFactory(holderFactory) @@ -244,9 +270,14 @@ class TDqPqReadActor : public NActors::TActor, public IDqCompute hFunc(TEvPrivate::TEvSourceDataReady, Handle); ) - void Handle(TEvPrivate::TEvSourceDataReady::TPtr&) { + void Handle(TEvPrivate::TEvSourceDataReady::TPtr& ev) { SRC_LOG_T("Source data ready"); SubscribedOnEvent = false; + if (ev.Get()->Cookie) { + Metrics.InFlySubscribe->Dec(); + } + Metrics.InFlyAsyncInputData->Set(1); + Metrics.AsyncInputDataRate->Inc(); Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } @@ -281,6 +312,7 @@ class TDqPqReadActor : public NActors::TActor, public IDqCompute } i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe& watermark, bool&, i64 freeSpace) override { + Metrics.InFlyAsyncInputData->Set(0); SRC_LOG_T("GetAsyncInputData freeSpace = " << freeSpace); const auto now = TInstant::Now(); @@ -384,9 +416,10 @@ class TDqPqReadActor : public NActors::TActor, public IDqCompute void SubscribeOnNextEvent() { if (!SubscribedOnEvent) { SubscribedOnEvent = true; + Metrics.InFlySubscribe->Inc(); NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem(); EventFuture = GetReadSession().WaitEvent().Subscribe([actorSystem, selfId = SelfId()](const auto&){ - actorSystem->Send(selfId, new TEvPrivate::TEvSourceDataReady()); + actorSystem->Send(selfId, new TEvPrivate::TEvSourceDataReady(), 0, 1); }); } } @@ -560,6 +593,7 @@ class TDqPqReadActor : public NActors::TActor, public IDqCompute const ui64 InputIndex; TDqAsyncStats IngressStats; const TTxId TxId; + TMetrics Metrics; const i64 BufferSize; const bool RangesMode; const THolderFactory& HolderFactory; @@ -596,6 +630,7 @@ std::pair CreateDqPqReadActor( const NActors::TActorId& computeActorId, const NKikimr::NMiniKQL::THolderFactory& holderFactory, i64 bufferSize, + const ::NMonitoring::TDynamicCounterPtr& counters, bool rangesMode ) { @@ -621,15 +656,16 @@ std::pair CreateDqPqReadActor( CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token, addBearerToToken), computeActorId, bufferSize, + counters, rangesMode ); return {actor, actor}; } -void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode) { +void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const ::NMonitoring::TDynamicCounterPtr& counters, bool rangesMode) { factory.RegisterSource("PqSource", - [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), rangesMode]( + [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, rangesMode]( NPq::NProto::TDqPqTopicSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { @@ -647,6 +683,7 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv args.ComputeActorId, args.HolderFactory, PQReadDefaultFreeSpace, + counters, rangesMode); }); diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h index 03cbd35a1a00..54d78879933b 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h @@ -34,10 +34,11 @@ std::pair CreateDqPqReadActor( ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const NActors::TActorId& computeActorId, const NKikimr::NMiniKQL::THolderFactory& holderFactory, + const ::NMonitoring::TDynamicCounterPtr& counters, i64 bufferSize = PQReadDefaultFreeSpace, bool rangesMode = true ); -void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode = true); +void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(), bool rangesMode = true); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_decompressor_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_decompressor_actor.cpp index 738ba1f6fb7c..5d56383c7475 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_decompressor_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_decompressor_actor.cpp @@ -33,7 +33,9 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl { private: bool nextImpl() final { while (!Coro->InputFinished || !Coro->Requests.empty()) { + Coro->CpuTime += Coro->GetCpuTimeDelta(); Coro->ProcessOneEvent(); + Coro->StartCycleCount = GetCycleCountFast(); if (Coro->InputBuffer) { RawDataBuffer.swap(Coro->InputBuffer); Coro->InputBuffer.clear(); @@ -65,6 +67,8 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl { } void Run() final { + StartCycleCount = GetCycleCountFast(); + try { std::unique_ptr coroBuffer = std::make_unique(this); NDB::ReadBuffer* buffer = coroBuffer.get(); @@ -74,15 +78,15 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl { decompressorBuffer->nextIfAtEnd(); TString data{decompressorBuffer->available(), ' '}; decompressorBuffer->read(&data.front(), decompressorBuffer->available()); - Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data))); + Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data), TakeCpuTimeDelta())); } } catch (const TDtorException&) { // Stop any activity instantly return; } catch (...) { - Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception())); + Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception(), TakeCpuTimeDelta())); } - Send(Parent, new TEvS3Provider::TEvDecompressDataFinish()); + Send(Parent, new TEvS3Provider::TEvDecompressDataFinish(TakeCpuTimeDelta())); } void ProcessOneEvent() { @@ -99,7 +103,19 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl { InputBuffer = std::move(event.Data); } + TDuration GetCpuTimeDelta() { + return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - StartCycleCount)); + } + + TDuration TakeCpuTimeDelta() { + auto currentCpuTime = CpuTime; + CpuTime = TDuration::Zero(); + return currentCpuTime; + } + private: + TDuration CpuTime; + ui64 StartCycleCount = 0; TString InputBuffer; TString Compression; TActorId Parent; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index cb2ead645748..9698745ae77d 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -937,10 +937,13 @@ class TS3ReadCoroImpl : public TActorCoroImpl { } void Handle(TEvS3Provider::TEvDecompressDataResult::TPtr& ev) { + CpuTime += ev->Get()->CpuTime; DeferredDecompressedDataParts.push(std::move(ev->Release())); + } - void Handle(TEvS3Provider::TEvDecompressDataFinish::TPtr&) { + void Handle(TEvS3Provider::TEvDecompressDataFinish::TPtr& ev) { + CpuTime += ev->Get()->CpuTime; DecompressedInputFinished = true; } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp index 03938548a60f..a43a46332b75 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp @@ -34,4 +34,30 @@ void RegisterS3ReadActorFactory( #endif } +TS3ReadActorFactoryConfig CreateReadActorFactoryConfig(const ::NYql::TS3GatewayConfig& s3Config) { + TS3ReadActorFactoryConfig s3ReadActoryConfig; + if (const ui64 rowsInBatch = s3Config.GetRowsInBatch()) { + s3ReadActoryConfig.RowsInBatch = rowsInBatch; + } + if (const ui64 maxInflight = s3Config.GetMaxInflight()) { + s3ReadActoryConfig.MaxInflight = maxInflight; + } + if (const ui64 dataInflight = s3Config.GetDataInflight()) { + s3ReadActoryConfig.DataInflight = dataInflight; + } + for (auto& formatSizeLimit: s3Config.GetFormatSizeLimit()) { + if (formatSizeLimit.GetName()) { // ignore unnamed limits + s3ReadActoryConfig.FormatSizeLimits.emplace( + formatSizeLimit.GetName(), formatSizeLimit.GetFileSizeLimit()); + } + } + if (s3Config.HasFileSizeLimit()) { + s3ReadActoryConfig.FileSizeLimit = s3Config.GetFileSizeLimit(); + } + if (s3Config.HasBlockFileSizeLimit()) { + s3ReadActoryConfig.BlockFileSizeLimit = s3Config.GetBlockFileSizeLimit(); + } + return s3ReadActoryConfig; +} + } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h index 89bfc5009c41..e50dae22d1f9 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h @@ -29,4 +29,6 @@ void RegisterS3ReadActorFactory( const TS3ReadActorFactoryConfig& = {}, ::NMonitoring::TDynamicCounterPtr counters = nullptr); +TS3ReadActorFactoryConfig CreateReadActorFactoryConfig(const ::NYql::TS3GatewayConfig& s3Config); + } diff --git a/ydb/library/yql/providers/s3/events/events.h b/ydb/library/yql/providers/s3/events/events.h index cbf4f4192bcf..f37b252a9a62 100644 --- a/ydb/library/yql/providers/s3/events/events.h +++ b/ydb/library/yql/providers/s3/events/events.h @@ -207,13 +207,27 @@ struct TEvS3Provider { }; struct TEvDecompressDataResult : public NActors::TEventLocal { - TEvDecompressDataResult(TString&& data) : Data(std::move(data)) {} - TEvDecompressDataResult(std::exception_ptr exception) : Exception(exception) {} + TEvDecompressDataResult(TString&& data, const TDuration& cpuTime) + : Data(std::move(data)) + , CpuTime(cpuTime) + {} + + TEvDecompressDataResult(std::exception_ptr exception, const TDuration& cpuTime) + : Exception(exception) + , CpuTime(cpuTime) + {} + TString Data; std::exception_ptr Exception; + TDuration CpuTime; }; struct TEvDecompressDataFinish : public NActors::TEventLocal { + TEvDecompressDataFinish(const TDuration& cpuTime) + : CpuTime(cpuTime) + {} + + TDuration CpuTime; }; struct TReadRange { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 8459ec134ddd..96dd09a14184 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -452,7 +452,7 @@ class TS3DqIntegration: public TDqIntegrationBase { paths.clear(); ReadPathsList(srcDesc, {}, serialized, paths); - NDq::TS3ReadActorFactoryConfig readActorConfig; + const NDq::TS3ReadActorFactoryConfig& readActorConfig = State_->Configuration->S3ReadActorFactoryConfig; ui64 fileSizeLimit = readActorConfig.FileSizeLimit; if (srcDesc.HasFormat()) { if (auto it = readActorConfig.FormatSizeLimits.find(srcDesc.GetFormat()); it != readActorConfig.FormatSizeLimits.end()) { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider.h b/ydb/library/yql/providers/s3/provider/yql_s3_provider.h index 86f1a4ef3d7d..42da0f693566 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_provider.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider.h @@ -4,6 +4,7 @@ #include #include #include +#include #include "yql_s3_settings.h" diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp index 587ead5cf7f2..656c002c041f 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp @@ -45,6 +45,7 @@ void TS3Configuration::Init(const TS3GatewayConfig& config, TIntrusivePtr #include +#include #include @@ -68,6 +69,7 @@ struct TS3Configuration : public TS3Settings, public NCommon::TSettingDispatcher bool WriteThroughDqIntegration = false; ui64 MaxListingResultSizePerPhysicalPartition; bool AllowAtomicUploadCommit = true; + NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig; }; } // NYql