Skip to content

Commit

Permalink
merge fq ydb stable 2024 07 29 (#7215)
Browse files Browse the repository at this point in the history
Co-authored-by: Oleg Doronin <hcpp@rtmr-dev01.search.yandex.net>
  • Loading branch information
dorooleg and Oleg Doronin authored Jul 30, 2024
1 parent 2db2156 commit 71d6ac0
Show file tree
Hide file tree
Showing 20 changed files with 143 additions and 35 deletions.
6 changes: 5 additions & 1 deletion ydb/core/fq/libs/actors/run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,11 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
mkqlDefaultLimit = 8_GB;
}

// This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
auto s3ReadDefaultInflightLimit = Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetDataInflight();
if (s3ReadDefaultInflightLimit == 0) {
s3ReadDefaultInflightLimit = Params.Config.GetGateways().GetS3().GetDataInflight();
}
if (s3ReadDefaultInflightLimit == 0) {
s3ReadDefaultInflightLimit = 200_MB;
}
Expand Down Expand Up @@ -1944,7 +1948,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {

{
dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory,
Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles()));
Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
}

{
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/fq/libs/common/cache_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include <ydb/services/ydb/ydb_common_ut.h>

using namespace NFq;
namespace NFq {

using TCache = TTtlCache<int,int>;

Expand Down Expand Up @@ -66,3 +66,5 @@ Y_UNIT_TEST_SUITE(Cache) {
UNIT_ASSERT_VALUES_EQUAL(cache.Size(), 0);
}
}

} // namespace NFq
20 changes: 4 additions & 16 deletions ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ void Init(
if (protoConfig.GetPrivateApi().GetEnabled()) {
const auto& s3readConfig = protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig();
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.MaxTime = TDuration::Max(), .RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg;
NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg = NYql::NDq::CreateReadActorFactoryConfig(protoConfig.GetGateways().GetS3());

// These fillings were left for the backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
if (const ui64 rowsInBatch = s3readConfig.GetRowsInBatch()) {
readActorFactoryCfg.RowsInBatch = rowsInBatch;
}
Expand All @@ -206,22 +208,8 @@ void Init(
if (const ui64 dataInflight = s3readConfig.GetDataInflight()) {
readActorFactoryCfg.DataInflight = dataInflight;
}
for (auto& formatSizeLimit: protoConfig.GetGateways().GetS3().GetFormatSizeLimit()) {
if (formatSizeLimit.GetName()) { // ignore unnamed limits
readActorFactoryCfg.FormatSizeLimits.emplace(
formatSizeLimit.GetName(), formatSizeLimit.GetFileSizeLimit());
}
}
if (protoConfig.GetGateways().GetS3().HasFileSizeLimit()) {
readActorFactoryCfg.FileSizeLimit =
protoConfig.GetGateways().GetS3().GetFileSizeLimit();
}
if (protoConfig.GetGateways().GetS3().HasBlockFileSizeLimit()) {
readActorFactoryCfg.BlockFileSizeLimit =
protoConfig.GetGateways().GetS3().GetBlockFileSizeLimit();
}

RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, yqCounters->GetSubgroup("subsystem", "DqSourceTracker"), !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
yqCounters->GetSubgroup("subsystem", "S3ReadActor"));
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(

if (federatedQuerySetup) {
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);
RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy, federatedQuerySetup->S3ReadActorFactoryConfig);
RegisterS3WriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);

if (federatedQuerySetup->ConnectorClient) {
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ namespace NKikimr::NKqp {

S3GatewayConfig = queryServiceConfig.GetS3();

S3ReadActorFactoryConfig = NYql::NDq::CreateReadActorFactoryConfig(S3GatewayConfig);

// Initialize Token Accessor
if (appConfig.GetAuthConfig().HasTokenAccessorConfig()) {
const auto& tokenAccessorConfig = appConfig.GetAuthConfig().GetTokenAccessorConfig();
Expand Down Expand Up @@ -102,7 +104,8 @@ namespace NKikimr::NKqp {
CredentialsFactory,
nullptr,
S3GatewayConfig,
GenericGatewaysConfig};
GenericGatewaysConfig,
S3ReadActorFactoryConfig};

// Init DatabaseAsyncResolver only if all requirements are met
if (DatabaseResolverActorId && MdbEndpointGenerator &&
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/kqp/federated_query/kqp_federated_query_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
#include <ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h>

namespace NKikimr::NKqp {

Expand All @@ -18,6 +19,7 @@ namespace NKikimr::NKqp {
NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResolver;
NYql::TS3GatewayConfig S3GatewayConfig;
NYql::TGenericGatewayConfig GenericGatewayConfig;
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
};

struct IKqpFederatedQuerySetupFactory {
Expand Down Expand Up @@ -51,6 +53,7 @@ namespace NKikimr::NKqp {
NYql::NConnector::IClient::TPtr ConnectorClient;
std::optional<NActors::TActorId> DatabaseResolverActorId;
NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator;
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
};

struct TKqpFederatedQuerySetupFactoryMock: public IKqpFederatedQuerySetupFactory {
Expand All @@ -74,7 +77,7 @@ namespace NKikimr::NKqp {

std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem*) override {
return TKqpFederatedQuerySetup{
HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig};
HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig, S3ReadActorFactoryConfig};
}

private:
Expand All @@ -84,6 +87,7 @@ namespace NKikimr::NKqp {
NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResolver;
NYql::TS3GatewayConfig S3GatewayConfig;
NYql::TGenericGatewayConfig GenericGatewayConfig;
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
};

IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory(
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/node_service/kqp_node_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class KqpNode : public TTestBase {
Runtime->EnableScheduleForActor(ResourceManagerActorId, true);
WaitForBootstrap();

auto FederatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}});
auto FederatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}});
auto asyncIoFactory = CreateKqpAsyncIoFactory(KqpCounters, FederatedQuerySetup);
auto kqpNode = CreateKqpNodeService(config, KqpCounters, CompFactory.Get(), asyncIoFactory);
KqpNodeActorId = Runtime->Register(kqpNode);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> ga
UNIT_ASSERT(TryParseFromTextFormat(defaultSettingsStream, defaultSettings));
kikimrConfig->Init(defaultSettings.GetDefaultSettings(), cluster, settings, true);

auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}});
auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}});
return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver,
federatedQuerySetup, nullptr, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem);
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/library/yql/providers/common/proto/gateways_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ message TS3GatewayConfig {
optional uint64 RegexpCacheSize = 14;
optional uint64 GeneratorPathsLimit = 15;
optional uint64 MaxListingResultSizePerPartition = 16;
optional uint64 RowsInBatch = 17; // Default = 1000
optional uint64 MaxInflight = 18; // Default = 20
optional uint64 DataInflight = 19; // Default = 200 MB
optional bool AllowLocalFiles = 20;

repeated TAttr DefaultSettings = 100;
}
Expand Down
45 changes: 41 additions & 4 deletions ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,30 @@ struct TEvPrivate {
} // namespace

class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqComputeActorAsyncInput {
struct TMetrics {
TMetrics(const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters)
: TxId(std::visit([](auto arg) { return ToString(arg); }, txId))
, Counters(counters) {
SubGroup = Counters->GetSubgroup("sink", "PqRead");
auto sink = SubGroup->GetSubgroup("tx_id", TxId);
auto task = sink->GetSubgroup("task_id", ToString(taskId));
InFlyAsyncInputData = task->GetCounter("InFlyAsyncInputData");
InFlySubscribe = task->GetCounter("InFlySubscribe");
AsyncInputDataRate = task->GetCounter("AsyncInputDataRate", true);
}

~TMetrics() {
SubGroup->RemoveSubgroup("id", TxId);
}

TString TxId;
::NMonitoring::TDynamicCounterPtr Counters;
::NMonitoring::TDynamicCounterPtr SubGroup;
::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData;
::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe;
::NMonitoring::TDynamicCounters::TCounterPtr AsyncInputDataRate;
};

public:
using TPartitionKey = std::pair<TString, ui64>; // Cluster, partition id.

Expand All @@ -104,10 +128,12 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const NActors::TActorId& computeActorId,
i64 bufferSize,
const ::NMonitoring::TDynamicCounterPtr& counters,
bool rangesMode)
: TActor<TDqPqReadActor>(&TDqPqReadActor::StateFunc)
, InputIndex(inputIndex)
, TxId(txId)
, Metrics(txId, taskId, counters)
, BufferSize(bufferSize)
, RangesMode(rangesMode)
, HolderFactory(holderFactory)
Expand Down Expand Up @@ -244,9 +270,14 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
hFunc(TEvPrivate::TEvSourceDataReady, Handle);
)

void Handle(TEvPrivate::TEvSourceDataReady::TPtr&) {
void Handle(TEvPrivate::TEvSourceDataReady::TPtr& ev) {
SRC_LOG_T("Source data ready");
SubscribedOnEvent = false;
if (ev.Get()->Cookie) {
Metrics.InFlySubscribe->Dec();
}
Metrics.InFlyAsyncInputData->Set(1);
Metrics.AsyncInputDataRate->Inc();
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}

Expand Down Expand Up @@ -281,6 +312,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
}

i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& watermark, bool&, i64 freeSpace) override {
Metrics.InFlyAsyncInputData->Set(0);
SRC_LOG_T("GetAsyncInputData freeSpace = " << freeSpace);

const auto now = TInstant::Now();
Expand Down Expand Up @@ -384,9 +416,10 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
void SubscribeOnNextEvent() {
if (!SubscribedOnEvent) {
SubscribedOnEvent = true;
Metrics.InFlySubscribe->Inc();
NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem();
EventFuture = GetReadSession().WaitEvent().Subscribe([actorSystem, selfId = SelfId()](const auto&){
actorSystem->Send(selfId, new TEvPrivate::TEvSourceDataReady());
actorSystem->Send(selfId, new TEvPrivate::TEvSourceDataReady(), 0, 1);
});
}
}
Expand Down Expand Up @@ -560,6 +593,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
const ui64 InputIndex;
TDqAsyncStats IngressStats;
const TTxId TxId;
TMetrics Metrics;
const i64 BufferSize;
const bool RangesMode;
const THolderFactory& HolderFactory;
Expand Down Expand Up @@ -596,6 +630,7 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
const NActors::TActorId& computeActorId,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
i64 bufferSize,
const ::NMonitoring::TDynamicCounterPtr& counters,
bool rangesMode
)
{
Expand All @@ -621,15 +656,16 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token, addBearerToToken),
computeActorId,
bufferSize,
counters,
rangesMode
);

return {actor, actor};
}

void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode) {
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const ::NMonitoring::TDynamicCounterPtr& counters, bool rangesMode) {
factory.RegisterSource<NPq::NProto::TDqPqTopicSource>("PqSource",
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), rangesMode](
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, rangesMode](
NPq::NProto::TDqPqTopicSource&& settings,
IDqAsyncIoFactory::TSourceArguments&& args)
{
Expand All @@ -647,6 +683,7 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv
args.ComputeActorId,
args.HolderFactory,
PQReadDefaultFreeSpace,
counters,
rangesMode);
});

Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const NActors::TActorId& computeActorId,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
i64 bufferSize = PQReadDefaultFreeSpace,
bool rangesMode = true
);

void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode = true);
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(), bool rangesMode = true);

} // namespace NYql::NDq
22 changes: 19 additions & 3 deletions ydb/library/yql/providers/s3/actors/yql_s3_decompressor_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
private:
bool nextImpl() final {
while (!Coro->InputFinished || !Coro->Requests.empty()) {
Coro->CpuTime += Coro->GetCpuTimeDelta();
Coro->ProcessOneEvent();
Coro->StartCycleCount = GetCycleCountFast();
if (Coro->InputBuffer) {
RawDataBuffer.swap(Coro->InputBuffer);
Coro->InputBuffer.clear();
Expand Down Expand Up @@ -65,6 +67,8 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
}

void Run() final {
StartCycleCount = GetCycleCountFast();

try {
std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<TCoroReadBuffer>(this);
NDB::ReadBuffer* buffer = coroBuffer.get();
Expand All @@ -74,15 +78,15 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
decompressorBuffer->nextIfAtEnd();
TString data{decompressorBuffer->available(), ' '};
decompressorBuffer->read(&data.front(), decompressorBuffer->available());
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data)));
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data), TakeCpuTimeDelta()));
}
} catch (const TDtorException&) {
// Stop any activity instantly
return;
} catch (...) {
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception()));
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception(), TakeCpuTimeDelta()));
}
Send(Parent, new TEvS3Provider::TEvDecompressDataFinish());
Send(Parent, new TEvS3Provider::TEvDecompressDataFinish(TakeCpuTimeDelta()));
}

void ProcessOneEvent() {
Expand All @@ -99,7 +103,19 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
InputBuffer = std::move(event.Data);
}

TDuration GetCpuTimeDelta() {
return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - StartCycleCount));
}

TDuration TakeCpuTimeDelta() {
auto currentCpuTime = CpuTime;
CpuTime = TDuration::Zero();
return currentCpuTime;
}

private:
TDuration CpuTime;
ui64 StartCycleCount = 0;
TString InputBuffer;
TString Compression;
TActorId Parent;
Expand Down
5 changes: 4 additions & 1 deletion ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -937,10 +937,13 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
}

void Handle(TEvS3Provider::TEvDecompressDataResult::TPtr& ev) {
CpuTime += ev->Get()->CpuTime;
DeferredDecompressedDataParts.push(std::move(ev->Release()));

}

void Handle(TEvS3Provider::TEvDecompressDataFinish::TPtr&) {
void Handle(TEvS3Provider::TEvDecompressDataFinish::TPtr& ev) {
CpuTime += ev->Get()->CpuTime;
DecompressedInputFinished = true;
}

Expand Down
Loading

0 comments on commit 71d6ac0

Please sign in to comment.