Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,6 @@ void TColumnShard::EnqueueBackgroundActivities(const bool periodic) {
StoragesManager->GetOperatorVerified(NOlap::IStoragesManager::DefaultStorageId);
StoragesManager->GetSharedBlobsManager()->GetStorageManagerVerified(NOlap::IStoragesManager::DefaultStorageId);
CSCounters.OnStartBackground();
SendPeriodicStats();

if (!TablesManager.HasPrimaryIndex()) {
AFL_NOTICE(NKikimrServices::TX_COLUMNSHARD)("problem", "Background activities cannot be started: no index at tablet");
Expand All @@ -537,7 +536,7 @@ class TChangesTask: public NConveyor::ITask {
TString ClassId;
NOlap::TSnapshot LastCompletedTx;
protected:
virtual bool DoExecute() override {
virtual TConclusionStatus DoExecute(const std::shared_ptr<NConveyor::ITask>& /*taskPtr*/) override {
NActors::TLogContextGuard g(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent_id", ParentActorId));
{
NOlap::TConstructionContext context(*TxEvent->IndexInfo, Counters, LastCompletedTx);
Expand All @@ -547,7 +546,7 @@ class TChangesTask: public NConveyor::ITask {
}
}
TActorContext::AsActorContext().Send(ParentActorId, std::move(TxEvent));
return true;
return TConclusionStatus::Success();
}
public:
virtual TString GetTaskClassIdentifier() const override {
Expand Down
20 changes: 20 additions & 0 deletions ydb/core/tx/columnshard/columnshard_private_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
#include <ydb/core/tx/data_events/write_data.h>
#include <ydb/core/formats/arrow/special_keys.h>

namespace NKikimr::NOlap::NReader {
class IApplyAction;
}

namespace NKikimr::NColumnShard {

struct TEvPrivate {
Expand Down Expand Up @@ -41,11 +45,27 @@ struct TEvPrivate {
EvExportCursorSaved,
EvExportSaveCursor,

EvTaskProcessedResult,

EvEnd
};

static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");

class TEvTaskProcessedResult: public NActors::TEventLocal<TEvTaskProcessedResult, EvTaskProcessedResult> {
private:
TConclusion<std::shared_ptr<NOlap::NReader::IApplyAction>> Result;

public:
TConclusion<std::shared_ptr<NOlap::NReader::IApplyAction>> ExtractResult() {
return std::move(Result);
}

TEvTaskProcessedResult(const TConclusion<std::shared_ptr<NOlap::NReader::IApplyAction>>& result)
: Result(result) {
}
};

struct TEvTieringModified: public TEventLocal<TEvTieringModified, EvTieringModified> {
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class TScanIteratorBase {
public:
virtual ~TScanIteratorBase() = default;

virtual void Apply(IDataTasksProcessor::ITask::TPtr /*processor*/) {
virtual void Apply(const std::shared_ptr<IApplyAction>& /*task*/) {

}

Expand Down
13 changes: 7 additions & 6 deletions ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,18 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
}
}

void TColumnShardScan::HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) {
void TColumnShardScan::HandleScan(NColumnShard::TEvPrivate::TEvTaskProcessedResult::TPtr& ev) {
--InFlightReads;
auto g = Stats->MakeGuard("task_result");
if (ev->Get()->GetErrorMessage()) {
ACFL_ERROR("event", "TEvTaskProcessedResult")("error", ev->Get()->GetErrorMessage());
SendScanError("task_error:" + ev->Get()->GetErrorMessage());
auto result = ev->Get()->ExtractResult();
if (result.IsFail()) {
ACFL_ERROR("event", "TEvTaskProcessedResult")("error", result.GetErrorMessage());
SendScanError("task_error:" + result.GetErrorMessage());
Finish(NColumnShard::TScanCounters::EStatusFinish::ConveyorInternalError);
} else {
ACFL_DEBUG("event", "TEvTaskProcessedResult");
auto t = static_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult());
Y_DEBUG_ABORT_UNLESS(dynamic_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult()));
auto t = static_pointer_cast<IApplyAction>(result.GetResult());
Y_DEBUG_ABORT_UNLESS(dynamic_pointer_cast<IDataTasksProcessor::ITask>(result.GetResult()));
if (!ScanIterator->Finished()) {
ScanIterator->Apply(t);
}
Expand Down
33 changes: 16 additions & 17 deletions ydb/core/tx/columnshard/engines/reader/actor/actor.h
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
#pragma once
#include <ydb/core/formats/arrow/converter.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
#include <ydb/core/tx/columnshard/counters/scan.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/abstract.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
#include <ydb/core/tx/columnshard/counters/scan.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
#include <ydb/core/tx/conveyor/usage/events.h>
#include <ydb/core/tx/tracing/usage/tracing.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_events.h>

#include <ydb/core/formats/arrow/converter.h>

#include <ydb/library/actors/core/log.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/chunks_limiter/chunks_limiter.h>

namespace NKikimr::NOlap::NReader {
Expand All @@ -22,6 +22,7 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
TActorId ReadCoordinatorActorId;
const std::shared_ptr<IStoragesManager> StoragesManager;
std::optional<TMonotonic> StartInstant;

public:
static constexpr auto ActorActivityType() {
return NKikimrServices::TActivity::KQP_OLAP_SCAN;
Expand All @@ -31,31 +32,29 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
virtual void PassAway() override;

TColumnShardScan(const TActorId& columnShardActorId, const TActorId& scanComputeActorId,
const std::shared_ptr<IStoragesManager>& storagesManager, const TComputeShardingPolicy& computeShardingPolicy,
ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie,
ui64 tabletId, TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange,
const std::shared_ptr<IStoragesManager>& storagesManager, const TComputeShardingPolicy& computeShardingPolicy, ui32 scanId, ui64 txId,
ui32 scanGen, ui64 requestCookie, ui64 tabletId, TDuration timeout, const TReadMetadataBase::TConstPtr& readMetadataRange,
NKikimrDataEvents::EDataFormat dataFormat, const NColumnShard::TScanCounters& scanCountersPool);

void Bootstrap(const TActorContext& ctx);

private:
STATEFN(StateScan) {
auto g = Stats->MakeGuard("processing");
TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_SCAN)
("SelfId", SelfId())("TabletId", TabletId)("ScanId", ScanId)("TxId", TxId)("ScanGen", ScanGen)
);
TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_SCAN) ("SelfId", SelfId())(
"TabletId", TabletId)("ScanId", ScanId)("TxId", TxId)("ScanGen", ScanGen));
switch (ev->GetTypeRewrite()) {
hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, HandleScan);
hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleScan);
hFunc(TEvents::TEvUndelivered, HandleScan);
hFunc(TEvents::TEvWakeup, HandleScan);
hFunc(NConveyor::TEvExecution::TEvTaskProcessedResult, HandleScan);
hFunc(NColumnShard::TEvPrivate::TEvTaskProcessedResult, HandleScan);
default:
AFL_VERIFY(false)("unexpected_event", ev->GetTypeName());
}
}

void HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev);
void HandleScan(NColumnShard::TEvPrivate::TEvTaskProcessedResult::TPtr& ev);

void HandleScan(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ev);

Expand All @@ -80,10 +79,10 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
class TScanStatsOwner: public NKqp::TEvKqpCompute::IShardScanStats {
private:
YDB_READONLY_DEF(TReadStats, Stats);

public:
TScanStatsOwner(const TReadStats& stats)
: Stats(stats) {

}

virtual THashMap<TString, ui64> GetMetrics() const override {
Expand Down Expand Up @@ -142,11 +141,11 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
TDuration ReadingDurationMax;
NMonitoring::THistogramPtr BlobDurationsCounter;
NMonitoring::THistogramPtr ByteDurationsCounter;

public:
TBlobStats(const NMonitoring::THistogramPtr blobDurationsCounter, const NMonitoring::THistogramPtr byteDurationsCounter)
: BlobDurationsCounter(blobDurationsCounter)
, ByteDurationsCounter(byteDurationsCounter) {

}
void Received(const TBlobRange& br, const TDuration d) {
ReadingDurationSum += d;
Expand Down Expand Up @@ -181,4 +180,4 @@ class TColumnShardScan: public TActorBootstrapped<TColumnShardScan>, NArrow::IRo
TDuration LastReportedElapsedTime;
};

}
} // namespace NKikimr::NOlap::NReader
18 changes: 16 additions & 2 deletions ydb/core/tx/columnshard/engines/reader/common/conveyor_task.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
#include "conveyor_task.h"
#include <ydb/library/actors/core/actor.h>
#include <ydb/core/tx/columnshard/columnshard_private_events.h>

namespace NKikimr::NOlap::NReader {

bool IDataTasksProcessor::ITask::Apply(IDataReader& indexedDataRead) const {
return DoApply(indexedDataRead);
NKikimr::TConclusionStatus IDataTasksProcessor::ITask::DoExecute(const std::shared_ptr<NConveyor::ITask>& taskPtr) {
auto result = DoExecuteImpl();
if (result.IsFail()) {
NActors::TActivationContext::AsActorContext().Send(OwnerId, new NColumnShard::TEvPrivate::TEvTaskProcessedResult(result));
} else {
NActors::TActivationContext::AsActorContext().Send(
OwnerId, new NColumnShard::TEvPrivate::TEvTaskProcessedResult(static_pointer_cast<IDataTasksProcessor::ITask>(taskPtr)));
}
return result;
}

void IDataTasksProcessor::ITask::DoOnCannotExecute(const TString& reason) {
NActors::TActivationContext::AsActorContext().Send(
OwnerId, new NColumnShard::TEvPrivate::TEvTaskProcessedResult(TConclusionStatus::Fail(reason)));
}

}
36 changes: 27 additions & 9 deletions ydb/core/tx/columnshard/engines/reader/common/conveyor_task.h
Original file line number Diff line number Diff line change
@@ -1,28 +1,46 @@
#pragma once

#include <ydb/core/tx/conveyor/usage/abstract.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/conclusion/result.h>

namespace NKikimr::NOlap::NReader {

class IDataReader;

class IApplyAction {
protected:
virtual bool DoApply(IDataReader& indexedDataRead) const = 0;

public:
bool Apply(IDataReader& indexedDataRead) const {
return DoApply(indexedDataRead);
}
};

class IDataTasksProcessor {
public:
class ITask: public NConveyor::ITask {
class ITask: public NConveyor::ITask, public IApplyAction {
private:
using TBase = NConveyor::ITask;
protected:
virtual bool DoApply(IDataReader& indexedDataRead) const = 0;
public:
ITask(const std::optional<NActors::TActorId> ownerId = {})
: TBase(ownerId) {
const NActors::TActorId OwnerId;
virtual TConclusionStatus DoExecuteImpl() = 0;

}
protected:
virtual TConclusionStatus DoExecute(const std::shared_ptr<NConveyor::ITask>& taskPtr) override final;
virtual void DoOnCannotExecute(const TString& reason) override;

public:
using TPtr = std::shared_ptr<ITask>;
virtual ~ITask() = default;
bool Apply(IDataReader& indexedDataRead) const;

ITask(const NActors::TActorId& ownerId)
: OwnerId(ownerId)
{

}
};
};

}
} // namespace NKikimr::NOlap::NReader
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "constructor.h"
#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/core/tx/columnshard/columnshard_private_events.h>

namespace NKikimr::NOlap::NReader::NPlain {

Expand All @@ -14,7 +15,7 @@ bool TBlobsFetcherTask::DoOnError(const TString& storageId, const TBlobRange& ra
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())("scan_actor_id", Context->GetCommonContext()->GetScanActorId())
("status", status.GetErrorMessage())("status_code", status.GetStatus())("storage_id", storageId);
NActors::TActorContext::AsActorContext().Send(Context->GetCommonContext()->GetScanActorId(),
std::make_unique<NConveyor::TEvExecution::TEvTaskProcessedResult>(TConclusionStatus::Fail("cannot read blob range " + range.ToString())));
std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(TConclusionStatus::Fail("cannot read blob range " + range.ToString())));
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,19 @@ bool TStepAction::DoApply(IDataReader& /*owner*/) const {
return true;
}

bool TStepAction::DoExecute() {
TConclusionStatus TStepAction::DoExecuteImpl() {
if (Source->IsAborted()) {
return true;
return TConclusionStatus::Success();
}
auto executeResult = Cursor.Execute(Source);
if (!executeResult) {
SetErrorMessage(executeResult.GetErrorMessage());
return false;
return executeResult;
}
if (*executeResult) {
Source->Finalize();
FinishedFlag = true;
}
return true;
return TConclusionStatus::Success();
}

TConclusion<bool> TColumnBlobsFetchingStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ class TStepAction: public IDataTasksProcessor::ITask {
bool FinishedFlag = false;
protected:
virtual bool DoApply(IDataReader& owner) const override;
virtual bool DoExecute() override;
virtual TConclusionStatus DoExecuteImpl() override;

public:
virtual TString GetTaskClassIdentifier() const override {
return "STEP_ACTION";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ TColumnShardScanIterator::~TColumnShardScanIterator() {
ReadMetadata->ReadStats->PrintToLog();
}

void TColumnShardScanIterator::Apply(IDataTasksProcessor::ITask::TPtr task) {
void TColumnShardScanIterator::Apply(const std::shared_ptr<IApplyAction>& task) {
if (!IndexedData->IsFinished()) {
Y_ABORT_UNLESS(task->Apply(*IndexedData));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class TColumnShardScanIterator: public TScanIteratorBase {
;
}

virtual void Apply(IDataTasksProcessor::ITask::TPtr task) override;
virtual void Apply(const std::shared_ptr<IApplyAction>& task) override;

bool Finished() const override {
return IndexedData->IsFinished() && ReadyResults.empty();
Expand Down
Loading