Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,6 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
, DataTxProfileLogThresholdMs(0, 0, 86400000)
, DataTxProfileBufferThresholdMs(0, 0, 86400000)
, DataTxProfileBufferSize(0, 1000, 100)
, ReadColumnsScanEnabled(1, 0, 1)
, ReadColumnsScanInUserPool(0, 0, 1)
, BackupReadAheadLo(0, 0, 64*1024*1024)
, BackupReadAheadHi(0, 0, 128*1024*1024)
, TtlReadAheadLo(0, 0, 64*1024*1024)
Expand Down Expand Up @@ -314,9 +312,6 @@ void TDataShard::IcbRegister() {
appData->Icb->RegisterSharedControl(HighDataSizeReportThreshlodBytes, "DataShardControls.HighDataSizeReportThreshlodBytes");
appData->Icb->RegisterSharedControl(HighDataSizeReportIntervalSeconds, "DataShardControls.HighDataSizeReportIntervalSeconds");

appData->Icb->RegisterSharedControl(ReadColumnsScanEnabled, "DataShardControls.ReadColumnsScanEnabled");
appData->Icb->RegisterSharedControl(ReadColumnsScanInUserPool, "DataShardControls.ReadColumnsScanInUserPool");

appData->Icb->RegisterSharedControl(BackupReadAheadLo, "DataShardControls.BackupReadAheadLo");
appData->Icb->RegisterSharedControl(BackupReadAheadHi, "DataShardControls.BackupReadAheadHi");

Expand Down
207 changes: 32 additions & 175 deletions ydb/core/tx/datashard/datashard__read_columns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ class TDataShard::TTxReadColumns : public NTabletFlatExecutor::TTransactionBase<
bool InclusiveTo;
ui64 RowsLimit = 100000;
ui64 BytesLimit = 1024*1024;
ui64 Restarts = 0;
TRowVersion ReadVersion = TRowVersion::Max();

public:
Expand All @@ -200,18 +199,7 @@ class TDataShard::TTxReadColumns : public NTabletFlatExecutor::TTransactionBase<

TTxType GetTxType() const override { return TXTYPE_READ_COLUMNS; }

bool Precharge(NTable::TDatabase& db, ui32 localTid, const TVector<NTable::TTag>& valueColumns) {
bool ready = db.Precharge(localTid,
KeyFrom,
KeyTo,
valueColumns,
0,
RowsLimit, BytesLimit,
NTable::EDirection::Forward, ReadVersion);
return ready;
}

bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
bool Execute(TTransactionContext&, const TActorContext& ctx) override {
// FIXME: we need to transform HEAD into some non-repeatable snapshot here
if (!ReadVersion.IsMax() && Self->GetVolatileTxManager().HasVolatileTxsAtSnapshot(ReadVersion)) {
Self->GetVolatileTxManager().AttachWaitingSnapshotEvent(
Expand All @@ -223,38 +211,13 @@ class TDataShard::TTxReadColumns : public NTabletFlatExecutor::TTransactionBase<

Result = new TEvDataShard::TEvReadColumnsResponse(Self->TabletID());

bool useScan = Self->ReadColumnsScanEnabled;

if (Self->IsFollower()) {
NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::OK;
TString errMessage;

if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage))
return false;

if (status != NKikimrTxDataShard::TError::OK) {
SetError(status, errMessage);
return true;
}

if (!ReadVersion.IsMax()) {
NIceDb::TNiceDb db(txc.DB);
TRowVersion lastCompleteTx;
if (!TDataShard::SysGetUi64(db, Schema::Sys_LastCompleteStep, lastCompleteTx.Step))
return false;
if (!TDataShard::SysGetUi64(db, Schema::Sys_LastCompleteTx, lastCompleteTx.TxId))
return false;

if (ReadVersion > lastCompleteTx) {
SetError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE,
TStringBuilder() << "RO replica last version " << lastCompleteTx
<< " lags behind the requested snapshot " << ReadVersion
<< " shard " << Self->TabletID());
return true;
}
}
// Note: this request is no longer supported, and it has never been used with followers
NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::WRONG_SHARD_STATE;
TString errMessage = "followers are not supported";

useScan = false;
SetError(status, errMessage);
return true;
}

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Read columns: " << Ev->Get()->Record);
Expand Down Expand Up @@ -330,26 +293,6 @@ class TDataShard::TTxReadColumns : public NTabletFlatExecutor::TTransactionBase<

TSerializedCellVec toKeyCells;

if (!useScan) {
// Use histogram to limit the range for single request
const auto& sizeHistogram = tableInfo.Stats.DataStats.DataSizeHistogram;
auto histIt = LowerBound(sizeHistogram.begin(), sizeHistogram.end(), fromKeyCells,
[&tableInfo] (const NTable::TBucket& bucket, const TSerializedCellVec& key) {
TSerializedCellVec bk(bucket.EndKey);
return CompareTypedCellVectors(
bk.GetCells().data(), key.GetCells().data(),
tableInfo.KeyColumnTypes.data(),
bk.GetCells().size(), key.GetCells().size()) < 0;
});

if (histIt != sizeHistogram.end() && ++histIt != sizeHistogram.end()) {
toKeyCells.Parse(histIt->EndKey);
for (ui32 i = 0; i < toKeyCells.GetCells().size(); ++i) {
KeyTo.push_back(TRawTypeValue(toKeyCells.GetCells()[i].AsRef(), tableInfo.KeyColumnTypes[i]));
}
}
}

TVector<NTable::TTag> valueColumns;
TVector<NScheme::TTypeInfo> valueColumnTypes;
TVector<std::pair<TString, NScheme::TTypeInfo>> columns;
Expand Down Expand Up @@ -384,121 +327,35 @@ class TDataShard::TTxReadColumns : public NTabletFlatExecutor::TTransactionBase<

tableInfo.Stats.AccessTime = TAppData::TimeProvider->Now();

if (useScan) {
if (snapshotKey) {
if (!Self->GetSnapshotManager().AcquireReference(*snapshotKey)) {
SetError(NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST,
TStringBuilder() << "Table id " << tableId << " has no snapshot at " << ReadVersion
<< " shard " << Self->TabletID() << (Self->IsFollower() ? " RO replica" : ""));
return true;
}
}

auto* scan = new TReadColumnsScan(TKeyBoundary{fromKeyCells, InclusiveFrom},
TKeyBoundary{toKeyCells, InclusiveTo},
valueColumns, valueColumnTypes,
std::move(blockBuilder), RowsLimit, BytesLimit,
TKeyBoundary{tableInfo.Range.To, tableInfo.Range.ToInclusive},
Ev->Sender, ctx.SelfID,
snapshotKey,
tableInfo.Path,
Self->TabletID());
auto opts = TScanOptions()
.SetResourceBroker("scan", 10)
.SetSnapshotRowVersion(ReadVersion)
.SetActorPoolId(Self->ReadColumnsScanInUserPool ? AppData(ctx)->UserPoolId : AppData(ctx)->BatchPoolId)
.SetReadAhead(512*1024, 1024*1024)
.SetReadPrio(TScanOptions::EReadPrio::Low);

ui64 cookie = -1; // Should be ignored
Self->QueueScan(localTableId, scan, cookie, opts);

Result.Destroy(); // Scan is now responsible for sending the result

return true;
}

// TODO: make sure KeyFrom and KeyTo properly reference non-inline cells data

if (!Precharge(txc.DB, localTableId, valueColumns))
return false;

size_t rows = 0;
size_t bytes = 0;
bool shardFinished = false;

{
NTable::TKeyRange iterRange;
iterRange.MinKey = KeyFrom;
iterRange.MinInclusive = InclusiveFrom;

auto iter = txc.DB.IterateRange(localTableId, iterRange, valueColumns, ReadVersion);

TString lastKeySerialized;
bool lastKeyInclusive = true;
while (iter->Next(NTable::ENext::All) == NTable::EReady::Data) {
TDbTupleRef rowKey = iter->GetKey();
lastKeySerialized = TSerializedCellVec::Serialize(rowKey.Cells());

// Compare current row with right boundary
int cmp = -1;// CompareTypedCellVectors(tuple.Columns, KeyTo.data(), tuple.Types, KeyTo.size());

if (cmp == 0 && KeyTo.size() < rowKey.ColumnCount) {
cmp = -1;
}
if (InclusiveTo) {
if (cmp > 0)
break; // Stop iff greater(cmp > 0)
} else {
if (cmp >= 0)
break; // Stop iff equal(cmp == 0) or greater(cmp > 0)
}

// Skip erased row
if (iter->Row().GetRowState() == NTable::ERowOp::Erase) {
continue;
}

TDbTupleRef rowValues = iter->GetValues();

blockBuilder->AddRow(rowKey, rowValues);

rows++;
bytes = blockBuilder->Bytes();

if (rows >= RowsLimit || bytes >= BytesLimit)
break;
}

// We don't want to do many restarts if pages weren't precharged
// So we just return whatever we read so far and the client can request more rows
if (iter->Last() == NTable::EReady::Page && rows < 1000 && bytes < 100000 && Restarts < 1) {
++Restarts;
return false;
}

if (iter->Last() == NTable::EReady::Gone) {
shardFinished = true;
lastKeySerialized = tableInfo.Range.To.GetBuffer();
lastKeyInclusive = tableInfo.Range.ToInclusive;
if (snapshotKey) {
if (!Self->GetSnapshotManager().AcquireReference(*snapshotKey)) {
SetError(NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST,
TStringBuilder() << "Table id " << tableId << " has no snapshot at " << ReadVersion
<< " shard " << Self->TabletID() << (Self->IsFollower() ? " RO replica" : ""));
return true;
}

TString buffer = blockBuilder->Finish();
buffer.resize(blockBuilder->Bytes());

Result->Record.SetBlocks(buffer);
Result->Record.SetLastKey(lastKeySerialized);
Result->Record.SetLastKeyInclusive(lastKeyInclusive);
Result->Record.SetEndOfShard(shardFinished);
}

Self->IncCounter(COUNTER_READ_COLUMNS_ROWS, rows);
Self->IncCounter(COUNTER_READ_COLUMNS_BYTES, bytes);

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID()
<< " Read columns result for table [" << tableInfo.Path << "]: "
<< rows << " rows, " << bytes << " bytes (event size "
<< Result->Record.GetBlocks().size() << ") shardFinished: " << shardFinished);
auto* scan = new TReadColumnsScan(TKeyBoundary{fromKeyCells, InclusiveFrom},
TKeyBoundary{toKeyCells, InclusiveTo},
valueColumns, valueColumnTypes,
std::move(blockBuilder), RowsLimit, BytesLimit,
TKeyBoundary{tableInfo.Range.To, tableInfo.Range.ToInclusive},
Ev->Sender, ctx.SelfID,
snapshotKey,
tableInfo.Path,
Self->TabletID());
auto opts = TScanOptions()
.SetResourceBroker("scan", 10)
.SetSnapshotRowVersion(ReadVersion)
.SetActorPoolId(AppData(ctx)->BatchPoolId)
.SetReadAhead(512*1024, 1024*1024)
.SetReadPrio(TScanOptions::EReadPrio::Low);

ui64 cookie = -1; // Should be ignored
Self->QueueScan(localTableId, scan, cookie, opts);

Result.Destroy(); // Scan is now responsible for sending the result

return true;
}
Expand Down
3 changes: 0 additions & 3 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2631,9 +2631,6 @@ class TDataShard
TControlWrapper DataTxProfileBufferThresholdMs;
TControlWrapper DataTxProfileBufferSize;

TControlWrapper ReadColumnsScanEnabled;
TControlWrapper ReadColumnsScanInUserPool;

TControlWrapper BackupReadAheadLo;
TControlWrapper BackupReadAheadHi;

Expand Down