Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1516,7 +1516,6 @@ message TColumnShardConfig {

optional TIndexMetadataMemoryLimit IndexMetadataMemoryLimit = 12;
optional bool CleanupEnabled = 13 [default = true];
optional uint32 RemovedPortionLivetimeSeconds = 14 [default = 600];

message TRepairInfo {
optional string ClassName = 1;
Expand Down
14 changes: 13 additions & 1 deletion ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
EnqueueBackgroundActivities();
BackgroundSessionsManager->Start();
ctx.Send(SelfId(), new TEvPrivate::TEvPeriodicWakeup());
ctx.Send(SelfId(), new TEvPrivate::TEvPingSnapshotsUsage());
NYDBTest::TControllers::GetColumnShardController()->OnSwitchToWork(TabletID());
AFL_VERIFY(!!StartInstant);
Counters.GetCSCounters().Initialization.OnSwitchToWork(TMonotonic::Now() - *StartInstant, TMonotonic::Now() - CreateInstant);
Expand Down Expand Up @@ -161,7 +162,9 @@ void TColumnShard::Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorCon
if (HasIndex()) {
index = &GetIndexAs<NOlap::TColumnEngineForLogs>().GetVersionedIndex();
}
InFlightReadsTracker.RemoveInFlightRequest(ev->Get()->RequestCookie, index);

InFlightReadsTracker.RemoveInFlightRequest(
ev->Get()->RequestCookie, index, TInstant::Now());

ui64 txId = ev->Get()->TxId;
if (ScanTxInFlight.contains(txId)) {
Expand All @@ -173,6 +176,14 @@ void TColumnShard::Handle(TEvPrivate::TEvReadFinished::TPtr& ev, const TActorCon
}
}

void TColumnShard::Handle(TEvPrivate::TEvPingSnapshotsUsage::TPtr& /*ev*/, const TActorContext& ctx) {
if (auto writeTx = InFlightReadsTracker.Ping(
this, NYDBTest::TControllers::GetColumnShardController()->GetPingCheckPeriod(0.6 * GetMaxReadStaleness()), TInstant::Now())) {
Execute(writeTx.release(), ctx);
}
ctx.Schedule(0.3 * GetMaxReadStaleness(), new TEvPrivate::TEvPingSnapshotsUsage());
}

void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx) {
if (ev->Get()->Manual) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "TEvPrivate::TEvPeriodicWakeup::MANUAL")("tablet_id", TabletID());
Expand All @@ -182,6 +193,7 @@ void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorC
SendWaitPlanStep(GetOutdatedStep());

SendPeriodicStats();
EnqueueBackgroundActivities();
ctx.Schedule(PeriodicWakeupActivationPeriod, new TEvPrivate::TEvPeriodicWakeup());
}
}
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
}
Self->SharingSessionsManager = local;
}
{
TMemoryProfileGuard g("TTxInit/TInFlightReadsTracker");
TInFlightReadsTracker local(Self->StoragesManager, Self->Counters.GetRequestsTracingCounters());
if (!local.LoadFromDatabase(txc.DB)) {
return false;
}
Self->InFlightReadsTracker = std::move(local);
}

Self->UpdateInsertTableCounters();
Self->UpdateIndexCounters();
Expand Down
29 changes: 19 additions & 10 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, TabletCountersHolder(new TProtobufTabletCounters<ESimpleCounters_descriptor, ECumulativeCounters_descriptor,
EPercentileCounters_descriptor, ETxTypes_descriptor>())
, Counters(*TabletCountersHolder)
, InFlightReadsTracker(StoragesManager)
, InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters())
, TablesManager(StoragesManager, info->TabletID)
, Subscribers(std::make_shared<NSubscriber::TManager>(*this))
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
Expand All @@ -84,8 +84,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, TTLTaskSubscription(NOlap::TTTLColumnEngineChanges::StaticTypeName(), Counters.GetSubscribeCounters())
, BackgroundController(Counters.GetBackgroundControllerCounters())
, NormalizerController(StoragesManager, Counters.GetSubscribeCounters())
, SysLocks(this)
, MaxReadStaleness(TDuration::MilliSeconds(AppDataVerified().ColumnShardConfig.GetMaxReadStaleness_ms())) {
, SysLocks(this) {
}

void TColumnShard::OnDetach(const TActorContext& ctx) {
Expand Down Expand Up @@ -186,12 +185,18 @@ ui64 TColumnShard::GetOutdatedStep() const {
return step;
}

ui64 TColumnShard::GetMinReadStep() const {
const TDuration maxReadStaleness = NYDBTest::TControllers::GetColumnShardController()->GetReadTimeoutClean(MaxReadStaleness);
ui64 delayMillisec = maxReadStaleness.MilliSeconds();
NOlap::TSnapshot TColumnShard::GetMinReadSnapshot() const {
ui64 delayMillisec = GetMaxReadStaleness().MilliSeconds();
ui64 passedStep = GetOutdatedStep();
ui64 minReadStep = (passedStep > delayMillisec ? passedStep - delayMillisec : 0);
return minReadStep;
Counters.GetRequestsTracingCounters()->OnDefaultMinSnapshotInstant(TInstant::MilliSeconds(minReadStep));

if (auto ssClean = InFlightReadsTracker.GetSnapshotToClean()) {
if (ssClean->GetPlanStep() < minReadStep) {
return *ssClean;
}
}
return NOlap::TSnapshot::MaxForPlanStep(minReadStep);
}

TWriteId TColumnShard::HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId) const {
Expand Down Expand Up @@ -785,9 +790,8 @@ void TColumnShard::SetupCleanupPortions() {
return;
}

NOlap::TSnapshot cleanupSnapshot{GetMinReadStep(), 0};

auto changes = TablesManager.MutablePrimaryIndex().StartCleanupPortions(cleanupSnapshot, TablesManager.GetPathsToDrop(), DataLocksManager);
auto changes =
TablesManager.MutablePrimaryIndex().StartCleanupPortions(GetMinReadSnapshot(), TablesManager.GetPathsToDrop(), DataLocksManager);
if (!changes) {
ACFL_DEBUG("background", "cleanup")("skip_reason", "no_changes");
return;
Expand Down Expand Up @@ -1134,4 +1138,9 @@ const NKikimr::NColumnShard::NTiers::TManager* TColumnShard::GetTierManagerPoint
return Tiers->GetManagerOptional(tierId);
}

TDuration TColumnShard::GetMaxReadStaleness() {
return NYDBTest::TControllers::GetColumnShardController()->GetReadTimeoutClean(
TDuration::MilliSeconds(AppDataVerified().ColumnShardConfig.GetMaxReadStaleness_ms()));
}

}
8 changes: 6 additions & 2 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ class TColumnShard
void Handle(TEvPrivate::TEvScanStats::TPtr &ev, const TActorContext &ctx);
void Handle(TEvPrivate::TEvReadFinished::TPtr &ev, const TActorContext &ctx);
void Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvPingSnapshotsUsage::TPtr& ev, const TActorContext& ctx);

void Handle(TEvPrivate::TEvWriteIndex::TPtr& ev, const TActorContext& ctx);
void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev);
void Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx);
Expand Down Expand Up @@ -361,6 +363,8 @@ class TColumnShard
HFunc(TEvPrivate::TEvScanStats, Handle);
HFunc(TEvPrivate::TEvReadFinished, Handle);
HFunc(TEvPrivate::TEvPeriodicWakeup, Handle);
HFunc(TEvPrivate::TEvPingSnapshotsUsage, Handle);

HFunc(NEvents::TDataEvents::TEvWrite, Handle);
HFunc(TEvPrivate::TEvWriteDraft, Handle);
HFunc(TEvPrivate::TEvGarbageCollectionFinished, Handle);
Expand Down Expand Up @@ -465,7 +469,7 @@ class TColumnShard
TLimits Limits;
NOlap::TNormalizationController NormalizerController;
NDataShard::TSysLocks SysLocks;
const TDuration MaxReadStaleness;
static TDuration GetMaxReadStaleness();

void TryRegisterMediatorTimeCast();
void UnregisterMediatorTimeCast();
Expand All @@ -475,7 +479,7 @@ class TColumnShard
void SendWaitPlanStep(ui64 step);
void RescheduleWaitingReads();
NOlap::TSnapshot GetMaxReadVersion() const;
ui64 GetMinReadStep() const;
NOlap::TSnapshot GetMinReadSnapshot() const;
ui64 GetOutdatedStep() const;
TDuration GetTxCompleteLag() const {
ui64 mediatorTime = MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0;
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/tx/columnshard/columnshard_private_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ struct TEvPrivate {
EvExportSaveCursor,

EvTaskProcessedResult,
EvPingSnapshotsUsage,

EvEnd
};
Expand Down Expand Up @@ -158,7 +159,11 @@ struct TEvPrivate {
bool Manual;
};

class TEvWriteBlobsResult : public TEventLocal<TEvWriteBlobsResult, EvWriteBlobsResult> {
struct TEvPingSnapshotsUsage: public TEventLocal<TEvPingSnapshotsUsage, EvPingSnapshotsUsage> {
TEvPingSnapshotsUsage() = default;
};

class TEvWriteBlobsResult: public TEventLocal<TEvWriteBlobsResult, EvWriteBlobsResult> {
private:
NColumnShard::TBlobPutResult::TPtr PutResult;
NOlap::TWritingBuffer WritesBuffer;
Expand Down
14 changes: 12 additions & 2 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ struct Schema : NIceDb::Schema {
TableVersionInfo = 11,
SmallBlobs = 12,
OneToOneEvictedBlobs = 13,
BlobsToDeleteWT = 14
BlobsToDeleteWT = 14,
InFlightSnapshots = 15
};

// Tablet tables
Expand Down Expand Up @@ -250,6 +251,14 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<BlobId, TabletId>;
};

struct InFlightSnapshots: Table<(ui32)ECommonTables::InFlightSnapshots> {
struct PlanStep: Column<1, NScheme::NTypeIds::Uint64> {};
struct TxId: Column<2, NScheme::NTypeIds::Uint64> {};

using TKey = TableKey<PlanStep, TxId>;
using TColumns = TableColumns<PlanStep, TxId>;
};

// Index tables

// InsertTable - common for all indices
Expand Down Expand Up @@ -545,7 +554,8 @@ struct Schema : NIceDb::Schema {
BackgroundSessions,
ShardingInfo,
Normalizers,
NormalizerEvents
NormalizerEvents,
InFlightSnapshots
>;

//
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/columnshard/common/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,12 @@ TString TSnapshot::SerializeToString() const {
return SerializeToProto().SerializeAsString();
}

NKikimr::NOlap::TSnapshot TSnapshot::MaxForPlanStep(const ui64 planStep) noexcept {
return TSnapshot(planStep, ::Max<ui64>());
}

NKikimr::NOlap::TSnapshot TSnapshot::MaxForPlanInstant(const TInstant planInstant) noexcept {
return TSnapshot(planInstant.MilliSeconds(), ::Max<ui64>());
}

};
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/common/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class TSnapshot {
return TSnapshot(-1ll, -1ll);
}

static TSnapshot MaxForPlanInstant(const TInstant planInstant) noexcept;

static TSnapshot MaxForPlanStep(const ui64 planStep) noexcept;

constexpr bool operator==(const TSnapshot&) const noexcept = default;

constexpr auto operator<=>(const TSnapshot&) const noexcept = default;
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/counters/counters_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "counters_manager.h"

namespace NKikimr::NColumnShard {

} // namespace NKikimr::NColumnShard
22 changes: 13 additions & 9 deletions ydb/core/tx/columnshard/counters/counters_manager.h
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
#pragma once

#include "background_controller.h"
#include "column_tables.h"
#include "columnshard.h"
#include "indexation.h"
#include "req_tracer.h"
#include "scan.h"
#include "column_tables.h"
#include "writes_monitor.h"
#include "tablet_counters.h"
#include "background_controller.h"
#include "writes_monitor.h"

#include <ydb/core/tx/columnshard/engines/column_engine.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/core/protos/table_stats.pb.h>
#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/protos/counters_columnshard.pb.h>
#include <ydb/core/protos/counters_datashard.pb.h>
#include <ydb/core/base/appdata_fwd.h>
#include <ydb/core/protos/table_stats.pb.h>
#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/core/tx/columnshard/engines/column_engine.h>

#include <library/cpp/time_provider/time_provider.h>

namespace NKikimr::NColumnShard {
Expand All @@ -32,6 +34,7 @@ class TCountersManager {
YDB_READONLY(TIndexationCounters, IndexationCounters, TIndexationCounters("Indexation"));
YDB_READONLY(TIndexationCounters, CompactionCounters, TIndexationCounters("GeneralCompaction"));
YDB_READONLY(TScanCounters, ScanCounters, TScanCounters("Scan"));
YDB_READONLY_DEF(std::shared_ptr<TRequestsTracerCounters>, RequestsTracingCounters);
YDB_READONLY_DEF(std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>, SubscribeCounters);

public:
Expand All @@ -40,8 +43,9 @@ class TCountersManager {
, WritesMonitor(std::make_shared<TWritesMonitor>(tabletCounters))
, BackgroundControllerCounters(std::make_shared<TBackgroundControllerCounters>())
, ColumnTablesCounters(std::make_shared<TColumnTablesCounters>())
, RequestsTracingCounters(std::make_shared<TRequestsTracerCounters>())
, SubscribeCounters(std::make_shared<NOlap::NResourceBroker::NSubscribe::TSubscriberCounters>()) {
}
};

} // namespace NKikimr::NColumnShard
} // namespace NKikimr::NColumnShard
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/counters/req_tracer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "req_tracer.h"

namespace NKikimr::NColumnShard {

}
51 changes: 51 additions & 0 deletions ydb/core/tx/columnshard/counters/req_tracer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#pragma once
#include "common/owner.h"
#include <ydb/core/tx/columnshard/common/snapshot.h>

namespace NKikimr::NColumnShard {

class TRequestsTracerCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
NMonitoring::TDynamicCounters::TCounterPtr RequestedMinSnapshotAge;
NMonitoring::TDynamicCounters::TCounterPtr DefaultMinSnapshotAge;
NMonitoring::TDynamicCounters::TCounterPtr SnapshotsCount;
NMonitoring::TDynamicCounters::TCounterPtr SnapshotLock;
NMonitoring::TDynamicCounters::TCounterPtr SnapshotUnlock;

public:

TRequestsTracerCounters()
: TBase("cs_requests_tracing")
, RequestedMinSnapshotAge(TBase::GetValue("Snapshots/RequestedAge/Seconds"))
, DefaultMinSnapshotAge(TBase::GetValue("Snapshots/DefaultAge/Seconds"))
, SnapshotsCount(TBase::GetValue("Snapshots/Count"))
, SnapshotLock(TBase::GetDeriviative("Snapshots/Lock"))
, SnapshotUnlock(TBase::GetDeriviative("Snapshots/Unlock"))
{

}

void OnDefaultMinSnapshotInstant(const TInstant instant) const {
DefaultMinSnapshotAge->Set((TInstant::Now() - instant).Seconds());
}

void OnSnapshotsInfo(const ui32 count, const std::optional<NOlap::TSnapshot> snapshotPlanStep) const {
if (snapshotPlanStep) {
RequestedMinSnapshotAge->Set((TInstant::Now() - snapshotPlanStep->GetPlanInstant()).Seconds());
} else {
RequestedMinSnapshotAge->Set(0);
}
SnapshotsCount->Set(count);

}

void OnSnapshotLocked() const {
SnapshotLock->Add(1);
}
void OnSnapshotUnlocked() const {
SnapshotUnlock->Add(1);
}
};

}
12 changes: 7 additions & 5 deletions ydb/core/tx/columnshard/counters/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ LIBRARY()

SRCS(
background_controller.cpp
column_tables.cpp
indexation.cpp
scan.cpp
engine_logs.cpp
counters_manager.cpp
blobs_manager.cpp
column_tables.cpp
columnshard.cpp
insert_table.cpp
common_data.cpp
engine_logs.cpp
indexation.cpp
insert_table.cpp
req_tracer.cpp
scan.cpp
splitter.cpp
)

Expand Down
Loading