Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge fq ydb stable 2024 07 29 #7215

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ydb/core/fq/libs/actors/run_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,11 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
mkqlDefaultLimit = 8_GB;
}

// This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
auto s3ReadDefaultInflightLimit = Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetDataInflight();
if (s3ReadDefaultInflightLimit == 0) {
s3ReadDefaultInflightLimit = Params.Config.GetGateways().GetS3().GetDataInflight();
}
if (s3ReadDefaultInflightLimit == 0) {
s3ReadDefaultInflightLimit = 200_MB;
}
Expand Down Expand Up @@ -1944,7 +1948,7 @@ class TRunActor : public NActors::TActorBootstrapped<TRunActor> {

{
dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory,
Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles()));
Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
}

{
Expand Down
20 changes: 4 additions & 16 deletions ydb/core/fq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ void Init(
if (protoConfig.GetPrivateApi().GetEnabled()) {
const auto& s3readConfig = protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig();
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.MaxTime = TDuration::Max(), .RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg;
NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg = NYql::NDq::CreateReadActorFactoryConfig(protoConfig.GetGateways().GetS3());

// These fillings were left for the backward compatibility. TODO: remove this part after migration to TS3GatewayConfig
if (const ui64 rowsInBatch = s3readConfig.GetRowsInBatch()) {
readActorFactoryCfg.RowsInBatch = rowsInBatch;
}
Expand All @@ -206,22 +208,8 @@ void Init(
if (const ui64 dataInflight = s3readConfig.GetDataInflight()) {
readActorFactoryCfg.DataInflight = dataInflight;
}
for (auto& formatSizeLimit: protoConfig.GetGateways().GetS3().GetFormatSizeLimit()) {
if (formatSizeLimit.GetName()) { // ignore unnamed limits
readActorFactoryCfg.FormatSizeLimits.emplace(
formatSizeLimit.GetName(), formatSizeLimit.GetFileSizeLimit());
}
}
if (protoConfig.GetGateways().GetS3().HasFileSizeLimit()) {
readActorFactoryCfg.FileSizeLimit =
protoConfig.GetGateways().GetS3().GetFileSizeLimit();
}
if (protoConfig.GetGateways().GetS3().HasBlockFileSizeLimit()) {
readActorFactoryCfg.BlockFileSizeLimit =
protoConfig.GetGateways().GetS3().GetBlockFileSizeLimit();
}

RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, yqCounters->GetSubgroup("subsystem", "DqSourceTracker"), !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
yqCounters->GetSubgroup("subsystem", "S3ReadActor"));
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(

if (federatedQuerySetup) {
auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);
RegisterS3ReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy, federatedQuerySetup->S3ReadActorFactoryConfig);
RegisterS3WriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->HttpGateway, s3HttpRetryPolicy);

if (federatedQuerySetup->ConnectorClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ namespace NKikimr::NKqp {

S3GatewayConfig = queryServiceConfig.GetS3();

S3ReadActorFactoryConfig = NYql::NDq::CreateReadActorFactoryConfig(S3GatewayConfig);

// Initialize Token Accessor
if (appConfig.GetAuthConfig().HasTokenAccessorConfig()) {
const auto& tokenAccessorConfig = appConfig.GetAuthConfig().GetTokenAccessorConfig();
Expand Down Expand Up @@ -102,7 +104,8 @@ namespace NKikimr::NKqp {
CredentialsFactory,
nullptr,
S3GatewayConfig,
GenericGatewaysConfig};
GenericGatewaysConfig,
S3ReadActorFactoryConfig};

// Init DatabaseAsyncResolver only if all requirements are met
if (DatabaseResolverActorId && MdbEndpointGenerator &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
#include <ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h>

namespace NKikimr::NKqp {

Expand All @@ -18,6 +19,7 @@ namespace NKikimr::NKqp {
NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResolver;
NYql::TS3GatewayConfig S3GatewayConfig;
NYql::TGenericGatewayConfig GenericGatewayConfig;
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
};

struct IKqpFederatedQuerySetupFactory {
Expand Down Expand Up @@ -51,6 +53,7 @@ namespace NKikimr::NKqp {
NYql::NConnector::IClient::TPtr ConnectorClient;
std::optional<NActors::TActorId> DatabaseResolverActorId;
NYql::IMdbEndpointGenerator::TPtr MdbEndpointGenerator;
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
};

struct TKqpFederatedQuerySetupFactoryMock: public IKqpFederatedQuerySetupFactory {
Expand All @@ -74,7 +77,7 @@ namespace NKikimr::NKqp {

std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem*) override {
return TKqpFederatedQuerySetup{
HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig};
HttpGateway, ConnectorClient, CredentialsFactory, DatabaseAsyncResolver, S3GatewayConfig, GenericGatewayConfig, S3ReadActorFactoryConfig};
}

private:
Expand All @@ -84,6 +87,7 @@ namespace NKikimr::NKqp {
NYql::IDatabaseAsyncResolver::TPtr DatabaseAsyncResolver;
NYql::TS3GatewayConfig S3GatewayConfig;
NYql::TGenericGatewayConfig GenericGatewayConfig;
NYql::NDq::TS3ReadActorFactoryConfig S3ReadActorFactoryConfig;
};

IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory(
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> ga
UNIT_ASSERT(TryParseFromTextFormat(defaultSettingsStream, defaultSettings));
kikimrConfig->Init(defaultSettings.GetDefaultSettings(), cluster, settings, true);

auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}});
auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}, {}});
return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver,
federatedQuerySetup, nullptr, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ message TS3GatewayConfig {
optional uint64 RegexpCacheSize = 14;
optional uint64 GeneratorPathsLimit = 15;
optional uint64 MaxListingResultSizePerPartition = 16;
optional uint64 RowsInBatch = 17; // Default = 1000
optional uint64 MaxInflight = 18; // Default = 20
optional uint64 DataInflight = 19; // Default = 200 MB
optional bool AllowLocalFiles = 20;

repeated TAttr DefaultSettings = 100;
}
Expand Down
45 changes: 41 additions & 4 deletions ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,30 @@ struct TEvPrivate {
} // namespace

class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqComputeActorAsyncInput {
struct TMetrics {
TMetrics(const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters)
: TxId(std::visit([](auto arg) { return ToString(arg); }, txId))
, Counters(counters) {
SubGroup = Counters->GetSubgroup("sink", "PqRead");
auto sink = SubGroup->GetSubgroup("tx_id", TxId);
auto task = sink->GetSubgroup("task_id", ToString(taskId));
InFlyAsyncInputData = task->GetCounter("InFlyAsyncInputData");
InFlySubscribe = task->GetCounter("InFlySubscribe");
AsyncInputDataRate = task->GetCounter("AsyncInputDataRate", true);
}

~TMetrics() {
SubGroup->RemoveSubgroup("id", TxId);
}

TString TxId;
::NMonitoring::TDynamicCounterPtr Counters;
::NMonitoring::TDynamicCounterPtr SubGroup;
::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData;
::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe;
::NMonitoring::TDynamicCounters::TCounterPtr AsyncInputDataRate;
};

public:
using TPartitionKey = std::pair<TString, ui64>; // Cluster, partition id.

Expand All @@ -104,10 +128,12 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
const NActors::TActorId& computeActorId,
i64 bufferSize,
const ::NMonitoring::TDynamicCounterPtr& counters,
bool rangesMode)
: TActor<TDqPqReadActor>(&TDqPqReadActor::StateFunc)
, InputIndex(inputIndex)
, TxId(txId)
, Metrics(txId, taskId, counters)
, BufferSize(bufferSize)
, RangesMode(rangesMode)
, HolderFactory(holderFactory)
Expand Down Expand Up @@ -244,9 +270,14 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
hFunc(TEvPrivate::TEvSourceDataReady, Handle);
)

void Handle(TEvPrivate::TEvSourceDataReady::TPtr&) {
void Handle(TEvPrivate::TEvSourceDataReady::TPtr& ev) {
SRC_LOG_T("Source data ready");
SubscribedOnEvent = false;
if (ev.Get()->Cookie) {
Metrics.InFlySubscribe->Dec();
}
Metrics.InFlyAsyncInputData->Set(1);
Metrics.AsyncInputDataRate->Inc();
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}

Expand Down Expand Up @@ -281,6 +312,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
}

i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& watermark, bool&, i64 freeSpace) override {
Metrics.InFlyAsyncInputData->Set(0);
SRC_LOG_T("GetAsyncInputData freeSpace = " << freeSpace);

const auto now = TInstant::Now();
Expand Down Expand Up @@ -384,9 +416,10 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
void SubscribeOnNextEvent() {
if (!SubscribedOnEvent) {
SubscribedOnEvent = true;
Metrics.InFlySubscribe->Inc();
NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem();
EventFuture = GetReadSession().WaitEvent().Subscribe([actorSystem, selfId = SelfId()](const auto&){
actorSystem->Send(selfId, new TEvPrivate::TEvSourceDataReady());
actorSystem->Send(selfId, new TEvPrivate::TEvSourceDataReady(), 0, 1);
});
}
}
Expand Down Expand Up @@ -560,6 +593,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public IDqCompute
const ui64 InputIndex;
TDqAsyncStats IngressStats;
const TTxId TxId;
TMetrics Metrics;
const i64 BufferSize;
const bool RangesMode;
const THolderFactory& HolderFactory;
Expand Down Expand Up @@ -596,6 +630,7 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
const NActors::TActorId& computeActorId,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
i64 bufferSize,
const ::NMonitoring::TDynamicCounterPtr& counters,
bool rangesMode
)
{
Expand All @@ -621,15 +656,16 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token, addBearerToToken),
computeActorId,
bufferSize,
counters,
rangesMode
);

return {actor, actor};
}

void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode) {
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const ::NMonitoring::TDynamicCounterPtr& counters, bool rangesMode) {
factory.RegisterSource<NPq::NProto::TDqPqTopicSource>("PqSource",
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), rangesMode](
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, rangesMode](
NPq::NProto::TDqPqTopicSource&& settings,
IDqAsyncIoFactory::TSourceArguments&& args)
{
Expand All @@ -647,6 +683,7 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv
args.ComputeActorId,
args.HolderFactory,
PQReadDefaultFreeSpace,
counters,
rangesMode);
});

Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const NActors::TActorId& computeActorId,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const ::NMonitoring::TDynamicCounterPtr& counters,
i64 bufferSize = PQReadDefaultFreeSpace,
bool rangesMode = true
);

void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool rangesMode = true);
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(), bool rangesMode = true);

} // namespace NYql::NDq
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
private:
bool nextImpl() final {
while (!Coro->InputFinished || !Coro->Requests.empty()) {
Coro->CpuTime += Coro->GetCpuTimeDelta();
Coro->ProcessOneEvent();
Coro->StartCycleCount = GetCycleCountFast();
if (Coro->InputBuffer) {
RawDataBuffer.swap(Coro->InputBuffer);
Coro->InputBuffer.clear();
Expand Down Expand Up @@ -65,6 +67,8 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
}

void Run() final {
StartCycleCount = GetCycleCountFast();

try {
std::unique_ptr<NDB::ReadBuffer> coroBuffer = std::make_unique<TCoroReadBuffer>(this);
NDB::ReadBuffer* buffer = coroBuffer.get();
Expand All @@ -74,15 +78,15 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
decompressorBuffer->nextIfAtEnd();
TString data{decompressorBuffer->available(), ' '};
decompressorBuffer->read(&data.front(), decompressorBuffer->available());
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data)));
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::move(data), TakeCpuTimeDelta()));
}
} catch (const TDtorException&) {
// Stop any activity instantly
return;
} catch (...) {
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception()));
Send(Parent, new TEvS3Provider::TEvDecompressDataResult(std::current_exception(), TakeCpuTimeDelta()));
}
Send(Parent, new TEvS3Provider::TEvDecompressDataFinish());
Send(Parent, new TEvS3Provider::TEvDecompressDataFinish(TakeCpuTimeDelta()));
}

void ProcessOneEvent() {
Expand All @@ -99,7 +103,19 @@ class TS3DecompressorCoroImpl : public TActorCoroImpl {
InputBuffer = std::move(event.Data);
}

TDuration GetCpuTimeDelta() {
return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - StartCycleCount));
}

TDuration TakeCpuTimeDelta() {
auto currentCpuTime = CpuTime;
CpuTime = TDuration::Zero();
return currentCpuTime;
}

private:
TDuration CpuTime;
ui64 StartCycleCount = 0;
TString InputBuffer;
TString Compression;
TActorId Parent;
Expand Down
5 changes: 4 additions & 1 deletion ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -937,10 +937,13 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
}

void Handle(TEvS3Provider::TEvDecompressDataResult::TPtr& ev) {
CpuTime += ev->Get()->CpuTime;
DeferredDecompressedDataParts.push(std::move(ev->Release()));

}

void Handle(TEvS3Provider::TEvDecompressDataFinish::TPtr&) {
void Handle(TEvS3Provider::TEvDecompressDataFinish::TPtr& ev) {
CpuTime += ev->Get()->CpuTime;
DecompressedInputFinished = true;
}

Expand Down
26 changes: 26 additions & 0 deletions ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,30 @@ void RegisterS3ReadActorFactory(
#endif
}

TS3ReadActorFactoryConfig CreateReadActorFactoryConfig(const ::NYql::TS3GatewayConfig& s3Config) {
TS3ReadActorFactoryConfig s3ReadActoryConfig;
if (const ui64 rowsInBatch = s3Config.GetRowsInBatch()) {
s3ReadActoryConfig.RowsInBatch = rowsInBatch;
}
if (const ui64 maxInflight = s3Config.GetMaxInflight()) {
s3ReadActoryConfig.MaxInflight = maxInflight;
}
if (const ui64 dataInflight = s3Config.GetDataInflight()) {
s3ReadActoryConfig.DataInflight = dataInflight;
}
for (auto& formatSizeLimit: s3Config.GetFormatSizeLimit()) {
if (formatSizeLimit.GetName()) { // ignore unnamed limits
s3ReadActoryConfig.FormatSizeLimits.emplace(
formatSizeLimit.GetName(), formatSizeLimit.GetFileSizeLimit());
}
}
if (s3Config.HasFileSizeLimit()) {
s3ReadActoryConfig.FileSizeLimit = s3Config.GetFileSizeLimit();
}
if (s3Config.HasBlockFileSizeLimit()) {
s3ReadActoryConfig.BlockFileSizeLimit = s3Config.GetBlockFileSizeLimit();
}
return s3ReadActoryConfig;
}

}
2 changes: 2 additions & 0 deletions ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ void RegisterS3ReadActorFactory(
const TS3ReadActorFactoryConfig& = {},
::NMonitoring::TDynamicCounterPtr counters = nullptr);

TS3ReadActorFactoryConfig CreateReadActorFactoryConfig(const ::NYql::TS3GatewayConfig& s3Config);

}
Loading
Loading