Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ using namespace NYql::NDq;
class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
public:
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp)
: TDqTaskRunnerExecutionContext(txId, withSpilling, std::move(wakeUp))
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUp))
, WithSpilling_(withSpilling)
{
}

Expand All @@ -26,6 +27,13 @@ class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
{
return KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs));
}

IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const override {
return TDqTaskRunnerExecutionContext::CreateChannelStorage(channelId, WithSpilling_ || withSpilling);
}

private:
bool WithSpilling_;
};

} // namespace NKqp
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ void TKqpComputeActor::DoBootstrap() {

auto wakeup = [this]{ ContinueExecute(); };
try {
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling,
std::move(wakeup)));
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
} catch (const NMiniKQL::TKqpEnsureFail& e) {
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
return;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
auto& record = channelsInfoEv->Record;

for (auto& channelId : channelIds) {
FillChannelDesc(TasksGraph, *record.AddUpdate(), TasksGraph.GetChannel(channelId), TasksGraph.GetMeta().ChannelTransportVersion);
FillChannelDesc(TasksGraph, *record.AddUpdate(), TasksGraph.GetChannel(channelId), TasksGraph.GetMeta().ChannelTransportVersion, false);
}

LOG_T("Sending channels info to compute actor: " << computeActorId << ", channels: " << channelIds.size());
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ constexpr ui32 MAX_NON_PARALLEL_TASKS_EXECUTION_LIMIT = 4;

TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling,
const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
bool useDataQueryPool, bool localComputeTasks, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
Expand Down Expand Up @@ -289,7 +289,7 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
auto plan = planner->Plan(ResourcesSnapshot, ResourceEstimations);

THashMap<ui64, ui64> alreadyAssigned;
for(auto& [nodeId, tasks] : TasksPerNode) {
for(auto& [nodeId, tasks] : TasksPerNode) {
for(ui64 taskId: tasks) {
alreadyAssigned.emplace(taskId, nodeId);
}
Expand Down Expand Up @@ -366,7 +366,7 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool op
task.ComputeActorId = computeActorId;

LOG_D("Executing task: " << taskId << " on compute actor: " << task.ComputeActorId);

auto result = PendingComputeActors.emplace(task.ComputeActorId, TProgressStat());
YQL_ENSURE(result.second);
}
Expand Down Expand Up @@ -409,7 +409,7 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
}
ComputeTasks.clear();
}

if (nComputeTasks == 0 && TasksPerNode.size() == 1 && (AsyncIoFactory != nullptr) && AllowSinglePartitionOpt) {
// query affects a single key or shard, so it might be more effective
// to execute this task locally so we can avoid useless overhead for remote task launching.
Expand Down Expand Up @@ -518,8 +518,8 @@ ui32 TKqpPlanner::CalcSendMessageFlagsForNode(ui32 nodeId) {
std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer,
const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling,
const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
bool useDataQueryPool, bool localComputeTasks, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool doOptimization,
const TIntrusivePtr<TUserRequestContext>& userRequestContext)
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class TKqpPlanner {
public:
TKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling,
const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& ExecuterSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
bool useDataQueryPool, bool localComputeTasks, ui64 mkqlMemoryLimit, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, bool allowSinglePartitionOpt,
const TIntrusivePtr<TUserRequestContext>& userRequestContext);
Expand All @@ -63,7 +63,7 @@ class TKqpPlanner {
ui32 GetnComputeTasks();

private:

const IKqpGateway::TKqpSnapshot& GetSnapshot() const;
void ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool optimizeProtoForLocalExecution);
void PrepareToProcess();
Expand Down Expand Up @@ -114,8 +114,8 @@ class TKqpPlanner {
std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer,
const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
const Ydb::Table::QueryStatsCollection::Mode& statsMode,
bool withSpilling, const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
const Ydb::Table::QueryStatsCollection::Mode& statsMode, bool withSpilling,
const TMaybe<NKikimrKqp::TRlPath>& rlPath, NWilson::TSpan& executerSpan,
TVector<NKikimrKqp::TKqpNodeResources>&& resourcesSnapshot,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& ExecuterRetriesConfig,
bool useDataQueryPool, bool localComputeTasks,
Expand Down
15 changes: 10 additions & 5 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -760,12 +760,13 @@ void FillEndpointDesc(NDqProto::TEndpoint& endpoint, const TTask& task) {
}

void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NDqProto::TChannel& channelDesc, const TChannel& channel,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion) {
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, bool enableSpilling) {
channelDesc.SetId(channel.Id);
channelDesc.SetSrcStageId(channel.SrcStageId.StageId);
channelDesc.SetDstStageId(channel.DstStageId.StageId);
channelDesc.SetSrcTaskId(channel.SrcTask);
channelDesc.SetDstTaskId(channel.DstTask);
channelDesc.SetEnableSpilling(enableSpilling);

const auto& resultChannelProxies = tasksGraph.GetMeta().ResultChannelProxies;

Expand Down Expand Up @@ -955,7 +956,7 @@ void FillTaskMeta(const TStageInfo& stageInfo, const TTask& task, NYql::NDqProto
}
}

void FillOutputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output) {
void FillOutputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output, bool enableSpilling) {
switch (output.Type) {
case TTaskOutputType::Map:
YQL_ENSURE(output.Channels.size() == 1);
Expand Down Expand Up @@ -1015,7 +1016,7 @@ void FillOutputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskOutpu

for (auto& channel : output.Channels) {
auto& channelDesc = *outputDesc.AddChannels();
FillChannelDesc(tasksGraph, channelDesc, tasksGraph.GetChannel(channel), tasksGraph.GetMeta().ChannelTransportVersion);
FillChannelDesc(tasksGraph, channelDesc, tasksGraph.GetChannel(channel), tasksGraph.GetMeta().ChannelTransportVersion, enableSpilling);
}
}

Expand Down Expand Up @@ -1067,7 +1068,7 @@ void FillInputDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TTaskInput&

for (ui64 channel : input.Channels) {
auto& channelDesc = *inputDesc.AddChannels();
FillChannelDesc(tasksGraph, channelDesc, tasksGraph.GetChannel(channel), tasksGraph.GetMeta().ChannelTransportVersion);
FillChannelDesc(tasksGraph, channelDesc, tasksGraph.GetChannel(channel), tasksGraph.GetMeta().ChannelTransportVersion, false);
}

if (input.Transform) {
Expand Down Expand Up @@ -1116,8 +1117,12 @@ void SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task, N
FillInputDesc(tasksGraph, *result->AddInputs(), input, serializeAsyncIoSettings);
}

bool enableSpilling = false;
if (task.Outputs.size() > 1) {
enableSpilling = AppData()->EnableKqpSpilling;
}
for (const auto& output : task.Outputs) {
FillOutputDesc(tasksGraph, *result->AddOutputs(), output);
FillOutputDesc(tasksGraph, *result->AddOutputs(), output, enableSpilling);
}

const NKqpProto::TKqpPhyStage& stage = stageInfo.Meta.GetStage(stageInfo.Id);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, const TStageInfo& stageIn
NYql::NDqProto::TDqTask* ArenaSerializeTaskToProto(TKqpTasksGraph& tasksGraph, const TTask& task, bool serializeAsyncIoSettings);
void SerializeTaskToProto(const TKqpTasksGraph& tasksGraph, const TTask& task, NYql::NDqProto::TDqTask* message, bool serializeAsyncIoSettings);
void FillTableMeta(const TStageInfo& stageInfo, NKikimrTxDataShard::TKqpTransaction_TTableMeta* meta);
void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TChannel& channelDesc,
const NYql::NDq::TChannel& channel, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion);
void FillChannelDesc(const TKqpTasksGraph& tasksGraph, NYql::NDqProto::TChannel& channelDesc, const NYql::NDq::TChannel& channel,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, bool enableSpilling);

template<typename Proto>
TVector<TTaskMeta::TColumn> BuildKqpColumns(const Proto& op, TIntrusiveConstPtr<TTableConstInfo> tableInfo) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/datashard_kqp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1105,11 +1105,11 @@ class TKqpTaskRunnerExecutionContext : public NDq::IDqTaskRunnerExecutionContext
return NKqp::KqpBuildOutputConsumer(outputDesc, type, applyCtx, typeEnv, holderFactory, std::move(outputs));
}

NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */) const override {
NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, bool /* withSpilling */) const override {
return {};
}

NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, TActorSystem* /* actorSystem */, bool /*isConcurrent*/) const override {
NDq::IDqChannelStorage::TPtr CreateChannelStorage(ui64 /* channelId */, bool /* withSpilling */, TActorSystem* /* actorSystem */, bool /*isConcurrent*/) const override {
return {};
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor>
Become(&TDqAsyncComputeActor::StateFuncWrapper<&TDqAsyncComputeActor::StateFuncBody>);

auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(
TxId, RuntimeSettings.UseSpilling, std::move(wakeup));
std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::make_shared<TDqTaskRunnerExecutionContext>(TxId, std::move(wakeup));

Send(TaskRunnerActorId,
new NTaskRunnerActor::TEvTaskRunnerCreate(
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class TDqComputeActor : public TDqComputeActorBase<TDqComputeActor> {
auto taskRunner = TaskRunnerFactory(Task, RuntimeSettings.StatsMode, logger);
SetTaskRunner(taskRunner);
auto wakeup = [this]{ ContinueExecute(EResumeSource::CABootstrapWakeup); };
TDqTaskRunnerExecutionContext execCtx(TxId, RuntimeSettings.UseSpilling, std::move(wakeup));
TDqTaskRunnerExecutionContext execCtx(TxId, std::move(wakeup));
PrepareTaskRunner(execCtx);

ContinueExecute(EResumeSource::CABootstrap);
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/actors/compute/dq_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct TEvDqCompute {
struct TEvStateRequest : public NActors::TEventPB<TEvStateRequest, NDqProto::TEvComputeStateRequest, TDqComputeEvents::EvStateRequest> {};

struct TEvResumeExecution : public NActors::TEventLocal<TEvResumeExecution, TDqComputeEvents::EvResumeExecution> {
TEvResumeExecution(EResumeSource source)
TEvResumeExecution(EResumeSource source)
: Source(source)
{ }

Expand Down
11 changes: 5 additions & 6 deletions ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,18 @@
namespace NYql {
namespace NDq {

TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp)
TDqTaskRunnerExecutionContext::TDqTaskRunnerExecutionContext(TTxId txId, IDqChannelStorage::TWakeUpCallback&& wakeUp)
: TxId_(txId)
, WakeUp_(std::move(wakeUp))
, WithSpilling_(withSpilling)
{
}

IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId) const {
return CreateChannelStorage(channelId, nullptr, false);
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, bool withSpilling) const {
return CreateChannelStorage(channelId, withSpilling, nullptr, false);
}

IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem, bool isConcurrent) const {
if (WithSpilling_) {
IDqChannelStorage::TPtr TDqTaskRunnerExecutionContext::CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem, bool isConcurrent) const {
if (withSpilling) {
return CreateDqChannelStorage(TxId_, channelId, WakeUp_, actorSystem, isConcurrent);
} else {
return nullptr;
Expand Down
7 changes: 3 additions & 4 deletions ydb/library/yql/dq/actors/compute/dq_task_runner_exec_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ namespace NDq {

class TDqTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContextBase {
public:
TDqTaskRunnerExecutionContext(TTxId txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp);
TDqTaskRunnerExecutionContext(TTxId txId, IDqChannelStorage::TWakeUpCallback&& wakeUp);

IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId) const override;
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, NActors::TActorSystem* actorSystem, bool isConcurrent) const override;
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling) const override;
IDqChannelStorage::TPtr CreateChannelStorage(ui64 channelId, bool withSpilling, NActors::TActorSystem* actorSystem, bool isConcurrent) const override;

private:
const TTxId TxId_;
const IDqChannelStorage::TWakeUpCallback WakeUp_;
const bool WithSpilling_;
};

} // namespace NDq
Expand Down
59 changes: 47 additions & 12 deletions ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,53 @@ using namespace NActors;

namespace {

#define LOG_D(s) \
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s)
#define LOG_I(s) \
LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
#define LOG_E(s) \
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s)
#define LOG_C(s) \
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
#define LOG_W(s) \
LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
#define LOG_T(s) \
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s)
#define LOG_D(s) { \
if (ActorSystem_) { \
LOG_DEBUG_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
} else { \
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
} \
}

#define LOG_I(s) { \
if (ActorSystem_) { \
LOG_INFO_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
} else { \
LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
} \
}

#define LOG_E(s) { \
if (ActorSystem_) { \
LOG_ERROR_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
} else { \
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
} \
}

#define LOG_C(s) { \
if (ActorSystem_) { \
LOG_CRIT_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
} else { \
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
} \
}

#define LOG_W(s) { \
if (ActorSystem_) { \
LOG_WARN_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
} else { \
LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
} \
}

#define LOG_T(s) { \
if (ActorSystem_) { \
LOG_TRACE_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
} else { \
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
} \
}

constexpr ui32 MAX_INFLIGHT_BLOBS_COUNT = 10;
constexpr ui64 MAX_INFLIGHT_BLOBS_SIZE = 50_MB;
Expand Down
Loading