Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions ydb/core/tx/columnshard/columnshard__progress_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
Self->ProgressTxController->FinishPlannedTx(txId, txc);
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_PLANNED_TX_COMPLETED);
}
Self->ProgressTxInFlight = false;
if (!!Self->ProgressTxController->GetPlannedTx()) {
Self->EnqueueProgressTx(ctx);
}
return true;
}

Expand All @@ -76,10 +80,6 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
if (LastCompletedTx) {
Self->LastCompletedTx = std::max(*LastCompletedTx, Self->LastCompletedTx);
}
Self->ProgressTxInFlight = false;
if (!!Self->ProgressTxController->GetPlannedTx()) {
Self->EnqueueProgressTx(ctx);
}
Self->SetupIndexation();
}
};
Expand Down
7 changes: 0 additions & 7 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,13 +476,6 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt

LOG_S_DEBUG("DropTable for pathId: " << pathId << " at tablet " << TabletID());
TablesManager.DropTable(pathId, version, db);

// TODO: Allow to read old snapshots after DROP
TBlobGroupSelector dsGroupSelector(Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
THashSet<TWriteId> writesToAbort = InsertTable->DropPath(dbTable, pathId);

TryAbortWrites(db, dbTable, std::move(writesToAbort));
}

void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, const NOlap::TSnapshot& version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void TChangesWithAppend::DoWriteIndexOnExecute(NColumnShard::TColumnShard* self,
}
const auto predRemoveDroppedTable = [self](const TWritePortionInfoWithBlobsResult& item) {
auto& portionInfo = item.GetPortionResult();
if (!!self && (!self->TablesManager.HasTable(portionInfo.GetPathId()) || self->TablesManager.GetTable(portionInfo.GetPathId()).IsDropped())) {
if (!!self && !self->TablesManager.HasTable(portionInfo.GetPathId(), false)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_inserted_data")("reason", "table_removed")("path_id", portionInfo.GetPathId());
return true;
} else {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/data_events/shard_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ namespace NKikimr::NEvWrite {

void TWritersController::OnSuccess(const ui64 shardId, const ui64 writeId, const ui32 writePartId) {
WriteIds[WritesIndex.Inc() - 1] = TWriteIdForShard(shardId, writeId, writePartId);
Counters->OnCSReply(TMonotonic::Now() - StartInstant);
if (!WritesCount.Dec()) {
Counters->OnFullReply(TMonotonic::Now() - StartInstant);
auto req = MakeHolder<NLongTxService::TEvLongTxService::TEvAttachColumnShardWrites>(LongTxId);
for (auto&& i : WriteIds) {
req->AddWrite(i.GetShardId(), i.GetWriteId());
Expand All @@ -28,6 +30,7 @@ namespace NKikimr::NEvWrite {
}

void TWritersController::OnFail(const Ydb::StatusIds::StatusCode code, const TString& message) {
Counters->OnCSFailed(code);
NYql::TIssues issues;
issues.AddIssue(message);
LongTxActorId.Send(LongTxActorId, new TEvPrivate::TEvShardsWriteResult(code, issues));
Expand Down
52 changes: 49 additions & 3 deletions ydb/core/tx/data_events/shard_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <ydb/core/tx/long_tx_service/public/events.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/wilson/wilson_profile_span.h>
#include <ydb/core/tx/columnshard/counters/common/owner.h>


namespace NKikimr::NEvWrite {
Expand All @@ -22,19 +23,64 @@ class TWriteIdForShard {
TWriteIdForShard(const ui64 shardId, const ui64 writeId, const ui32 writePartId)
: ShardId(shardId)
, WriteId(writeId)
, WritePartId(writePartId)
{
, WritePartId(writePartId) {
}
};

class TCSUploadCounters: public NColumnShard::TCommonCountersOwner {
private:
using TBase = NColumnShard::TCommonCountersOwner;
NMonitoring::TDynamicCounters::TCounterPtr RequestsCount;
NMonitoring::THistogramPtr CSReplyDuration;
NMonitoring::THistogramPtr FullReplyDuration;
NMonitoring::THistogramPtr BytesDistribution;
NMonitoring::THistogramPtr RowsDistribution;
NMonitoring::TDynamicCounters::TCounterPtr RowsCount;
NMonitoring::TDynamicCounters::TCounterPtr BytesCount;
NMonitoring::TDynamicCounters::TCounterPtr FailsCount;
public:
TCSUploadCounters()
: TBase("CSUpload")
, RequestsCount(TBase::GetDeriviative("Requests"))
, CSReplyDuration(TBase::GetHistogram("Replies/Shard/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 1)))
, FullReplyDuration(TBase::GetHistogram("Replies/Full/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 1)))
, BytesDistribution(TBase::GetHistogram("Requests/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1024)))
, RowsDistribution(TBase::GetHistogram("Requests/Rows", NMonitoring::ExponentialHistogram(15, 2, 16)))
, RowsCount(TBase::GetDeriviative("Rows"))
, BytesCount(TBase::GetDeriviative("Bytes"))
, FailsCount(TBase::GetDeriviative("Fails")) {

}

void OnRequest(const ui64 rows, const ui64 bytes) const {
BytesDistribution->Collect(bytes);
RowsDistribution->Collect(rows);
BytesCount->Add(bytes);
RowsCount->Add(rows);
}

void OnCSFailed(const Ydb::StatusIds::StatusCode /*code*/) {
FailsCount->Add(1);
}
};

void OnCSReply(const TDuration d) const {
CSReplyDuration->Collect(d.MilliSeconds());
}

void OnFullReply(const TDuration d) const {
FullReplyDuration->Collect(d.MilliSeconds());
}
};
// External transaction controller class
class TWritersController {
private:
TAtomicCounter WritesCount = 0;
TAtomicCounter WritesIndex = 0;
NActors::TActorIdentity LongTxActorId;
std::vector<TWriteIdForShard> WriteIds;
const TMonotonic StartInstant = TMonotonic::Now();
YDB_READONLY_DEF(NLongTxService::TLongTxId, LongTxId);
YDB_READONLY(std::shared_ptr<TCSUploadCounters>, Counters, std::make_shared<TCSUploadCounters>());
public:
using TPtr = std::shared_ptr<TWritersController>;

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/tx_proxy/rpc_long_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class TLongTxWriteBase : public TActorBootstrapped<TLongTxWriteImpl> {
ui32 writeIdx = 0;
for (auto& [shard, infos] : splittedData.GetShardsInfo()) {
for (auto&& shardInfo : infos) {
InternalController->GetCounters()->OnRequest(shardInfo->GetRowsCount(), shardInfo->GetBytes());
sumBytes += shardInfo->GetBytes();
rowsCount += shardInfo->GetRowsCount();
this->Register(new NEvWrite::TShardWriter(shard, shardsSplitter->GetTableId(), DedupId, shardInfo, ActorSpan, InternalController, ++writeIdx, NEvWrite::EModificationType::Replace));
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/tx_proxy/upload_rows_common_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ namespace NKikimr {
RowsCount = TBase::GetDeriviative("Rows/Count");
PackageSize = TBase::GetHistogram("Rows/PackageSize", NMonitoring::ExponentialHistogram(15, 2, 10));

DurationToStartCommit = TBase::GetHistogram("ToStartCommit/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
DurationToFinishCommit = TBase::GetHistogram("ToFinishCommit/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
DurationToStartWriting = TBase::GetHistogram("ToStartWriting/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));
DurationToTxStarted = TBase::GetHistogram("ToTxStarted/DurationMs", NMonitoring::ExponentialHistogram(15, 2, 10));

const google::protobuf::EnumDescriptor* descriptor = ::Ydb::StatusIds::StatusCode_descriptor();
for (ui32 i = 0; i < (ui32)descriptor->value_count(); ++i) {
auto vDescription = descriptor->value(i);
Expand Down
25 changes: 25 additions & 0 deletions ydb/core/tx/tx_proxy/upload_rows_common_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,31 @@ class TUploadCounters: public NColumnShard::TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr RowsCount;
NMonitoring::THistogramPtr PackageSize;

NMonitoring::THistogramPtr DurationToStartCommit;
NMonitoring::THistogramPtr DurationToFinishCommit;
NMonitoring::THistogramPtr DurationToStartWriting;
NMonitoring::THistogramPtr DurationToTxStarted;

THashMap<TString, NMonitoring::TDynamicCounters::TCounterPtr> CodesCount;
public:
TUploadCounters();

void OnTxStarted(const TDuration d) const {
DurationToTxStarted->Collect(d.MilliSeconds());
}

void OnWritingStarted(const TDuration d) const {
DurationToStartWriting->Collect(d.MilliSeconds());
}

void OnStartCommit(const TDuration d) const {
DurationToStartCommit->Collect(d.MilliSeconds());
}

void OnFinishCommit(const TDuration d) const {
DurationToFinishCommit->Collect(d.MilliSeconds());
}

void OnRequest(const ui64 rowsCount) const {
RequestsCount->Add(1);
RowsCount->Add(rowsCount);
Expand Down Expand Up @@ -741,6 +762,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
}

void WriteToColumnTable(const NActors::TActorContext& ctx) {
UploadCounters.OnWritingStarted(TAppData::TimeProvider->Now() - StartTime);
TString accessCheckError;
if (!CheckAccess(accessCheckError)) {
return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, LogPrefix() << accessCheckError, ctx);
Expand All @@ -765,6 +787,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit

void Handle(NLongTxService::TEvLongTxService::TEvBeginTxResult::TPtr& ev, const TActorContext& ctx) {
const auto* msg = ev->Get();
UploadCounters.OnTxStarted(TAppData::TimeProvider->Now() - StartTime);

if (msg->Record.GetStatus() != Ydb::StatusIds::SUCCESS) {
NYql::TIssues issues;
Expand Down Expand Up @@ -894,6 +917,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
}

void CommitLongTx(const TActorContext& ctx) {
UploadCounters.OnStartCommit(TAppData::TimeProvider->Now() - StartTime);
TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId());
ctx.Send(longTxServiceId, new NLongTxService::TEvLongTxService::TEvCommitTx(LongTxId), 0, 0, Span.GetTraceId());
TBase::Become(&TThis::StateWaitCommitLongTx);
Expand All @@ -908,6 +932,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
}

void Handle(NLongTxService::TEvLongTxService::TEvCommitTxResult::TPtr& ev, const NActors::TActorContext& ctx) {
UploadCounters.OnFinishCommit(TAppData::TimeProvider->Now() - StartTime);
const auto* msg = ev->Get();

if (msg->Record.GetStatus() == Ydb::StatusIds::SUCCESS) {
Expand Down