diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index dcc2cec7abba..5433ad33c9f7 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -145,8 +145,6 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info) , DataTxProfileLogThresholdMs(0, 0, 86400000) , DataTxProfileBufferThresholdMs(0, 0, 86400000) , DataTxProfileBufferSize(0, 1000, 100) - , ReadColumnsScanEnabled(1, 0, 1) - , ReadColumnsScanInUserPool(0, 0, 1) , BackupReadAheadLo(0, 0, 64*1024*1024) , BackupReadAheadHi(0, 0, 128*1024*1024) , TtlReadAheadLo(0, 0, 64*1024*1024) @@ -314,9 +312,6 @@ void TDataShard::IcbRegister() { appData->Icb->RegisterSharedControl(HighDataSizeReportThreshlodBytes, "DataShardControls.HighDataSizeReportThreshlodBytes"); appData->Icb->RegisterSharedControl(HighDataSizeReportIntervalSeconds, "DataShardControls.HighDataSizeReportIntervalSeconds"); - appData->Icb->RegisterSharedControl(ReadColumnsScanEnabled, "DataShardControls.ReadColumnsScanEnabled"); - appData->Icb->RegisterSharedControl(ReadColumnsScanInUserPool, "DataShardControls.ReadColumnsScanInUserPool"); - appData->Icb->RegisterSharedControl(BackupReadAheadLo, "DataShardControls.BackupReadAheadLo"); appData->Icb->RegisterSharedControl(BackupReadAheadHi, "DataShardControls.BackupReadAheadHi"); diff --git a/ydb/core/tx/datashard/datashard__read_columns.cpp b/ydb/core/tx/datashard/datashard__read_columns.cpp index aed10af0c5fd..d3f409dc6e77 100644 --- a/ydb/core/tx/datashard/datashard__read_columns.cpp +++ b/ydb/core/tx/datashard/datashard__read_columns.cpp @@ -184,7 +184,6 @@ class TDataShard::TTxReadColumns : public NTabletFlatExecutor::TTransactionBase< bool InclusiveTo; ui64 RowsLimit = 100000; ui64 BytesLimit = 1024*1024; - ui64 Restarts = 0; TRowVersion ReadVersion = TRowVersion::Max(); public: @@ -200,18 +199,7 @@ class TDataShard::TTxReadColumns : public NTabletFlatExecutor::TTransactionBase< TTxType GetTxType() const override { return TXTYPE_READ_COLUMNS; } - bool Precharge(NTable::TDatabase& db, ui32 localTid, const TVector& valueColumns) { - bool ready = db.Precharge(localTid, - KeyFrom, - KeyTo, - valueColumns, - 0, - RowsLimit, BytesLimit, - NTable::EDirection::Forward, ReadVersion); - return ready; - } - - bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { + bool Execute(TTransactionContext&, const TActorContext& ctx) override { // FIXME: we need to transform HEAD into some non-repeatable snapshot here if (!ReadVersion.IsMax() && Self->GetVolatileTxManager().HasVolatileTxsAtSnapshot(ReadVersion)) { Self->GetVolatileTxManager().AttachWaitingSnapshotEvent( @@ -223,38 +211,13 @@ class TDataShard::TTxReadColumns : public NTabletFlatExecutor::TTransactionBase< Result = new TEvDataShard::TEvReadColumnsResponse(Self->TabletID()); - bool useScan = Self->ReadColumnsScanEnabled; - if (Self->IsFollower()) { - NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::OK; - TString errMessage; - - if (!Self->SyncSchemeOnFollower(txc, ctx, status, errMessage)) - return false; - - if (status != NKikimrTxDataShard::TError::OK) { - SetError(status, errMessage); - return true; - } - - if (!ReadVersion.IsMax()) { - NIceDb::TNiceDb db(txc.DB); - TRowVersion lastCompleteTx; - if (!TDataShard::SysGetUi64(db, Schema::Sys_LastCompleteStep, lastCompleteTx.Step)) - return false; - if (!TDataShard::SysGetUi64(db, Schema::Sys_LastCompleteTx, lastCompleteTx.TxId)) - return false; - - if (ReadVersion > lastCompleteTx) { - SetError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, - TStringBuilder() << "RO replica last version " << lastCompleteTx - << " lags behind the requested snapshot " << ReadVersion - << " shard " << Self->TabletID()); - return true; - } - } + // Note: this request is no longer supported, and it has never been used with followers + NKikimrTxDataShard::TError::EKind status = NKikimrTxDataShard::TError::WRONG_SHARD_STATE; + TString errMessage = "followers are not supported"; - useScan = false; + SetError(status, errMessage); + return true; } LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Read columns: " << Ev->Get()->Record); @@ -330,26 +293,6 @@ class TDataShard::TTxReadColumns : public NTabletFlatExecutor::TTransactionBase< TSerializedCellVec toKeyCells; - if (!useScan) { - // Use histogram to limit the range for single request - const auto& sizeHistogram = tableInfo.Stats.DataStats.DataSizeHistogram; - auto histIt = LowerBound(sizeHistogram.begin(), sizeHistogram.end(), fromKeyCells, - [&tableInfo] (const NTable::TBucket& bucket, const TSerializedCellVec& key) { - TSerializedCellVec bk(bucket.EndKey); - return CompareTypedCellVectors( - bk.GetCells().data(), key.GetCells().data(), - tableInfo.KeyColumnTypes.data(), - bk.GetCells().size(), key.GetCells().size()) < 0; - }); - - if (histIt != sizeHistogram.end() && ++histIt != sizeHistogram.end()) { - toKeyCells.Parse(histIt->EndKey); - for (ui32 i = 0; i < toKeyCells.GetCells().size(); ++i) { - KeyTo.push_back(TRawTypeValue(toKeyCells.GetCells()[i].AsRef(), tableInfo.KeyColumnTypes[i])); - } - } - } - TVector valueColumns; TVector valueColumnTypes; TVector> columns; @@ -384,121 +327,35 @@ class TDataShard::TTxReadColumns : public NTabletFlatExecutor::TTransactionBase< tableInfo.Stats.AccessTime = TAppData::TimeProvider->Now(); - if (useScan) { - if (snapshotKey) { - if (!Self->GetSnapshotManager().AcquireReference(*snapshotKey)) { - SetError(NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST, - TStringBuilder() << "Table id " << tableId << " has no snapshot at " << ReadVersion - << " shard " << Self->TabletID() << (Self->IsFollower() ? " RO replica" : "")); - return true; - } - } - - auto* scan = new TReadColumnsScan(TKeyBoundary{fromKeyCells, InclusiveFrom}, - TKeyBoundary{toKeyCells, InclusiveTo}, - valueColumns, valueColumnTypes, - std::move(blockBuilder), RowsLimit, BytesLimit, - TKeyBoundary{tableInfo.Range.To, tableInfo.Range.ToInclusive}, - Ev->Sender, ctx.SelfID, - snapshotKey, - tableInfo.Path, - Self->TabletID()); - auto opts = TScanOptions() - .SetResourceBroker("scan", 10) - .SetSnapshotRowVersion(ReadVersion) - .SetActorPoolId(Self->ReadColumnsScanInUserPool ? AppData(ctx)->UserPoolId : AppData(ctx)->BatchPoolId) - .SetReadAhead(512*1024, 1024*1024) - .SetReadPrio(TScanOptions::EReadPrio::Low); - - ui64 cookie = -1; // Should be ignored - Self->QueueScan(localTableId, scan, cookie, opts); - - Result.Destroy(); // Scan is now responsible for sending the result - - return true; - } - - // TODO: make sure KeyFrom and KeyTo properly reference non-inline cells data - - if (!Precharge(txc.DB, localTableId, valueColumns)) - return false; - - size_t rows = 0; - size_t bytes = 0; - bool shardFinished = false; - - { - NTable::TKeyRange iterRange; - iterRange.MinKey = KeyFrom; - iterRange.MinInclusive = InclusiveFrom; - - auto iter = txc.DB.IterateRange(localTableId, iterRange, valueColumns, ReadVersion); - - TString lastKeySerialized; - bool lastKeyInclusive = true; - while (iter->Next(NTable::ENext::All) == NTable::EReady::Data) { - TDbTupleRef rowKey = iter->GetKey(); - lastKeySerialized = TSerializedCellVec::Serialize(rowKey.Cells()); - - // Compare current row with right boundary - int cmp = -1;// CompareTypedCellVectors(tuple.Columns, KeyTo.data(), tuple.Types, KeyTo.size()); - - if (cmp == 0 && KeyTo.size() < rowKey.ColumnCount) { - cmp = -1; - } - if (InclusiveTo) { - if (cmp > 0) - break; // Stop iff greater(cmp > 0) - } else { - if (cmp >= 0) - break; // Stop iff equal(cmp == 0) or greater(cmp > 0) - } - - // Skip erased row - if (iter->Row().GetRowState() == NTable::ERowOp::Erase) { - continue; - } - - TDbTupleRef rowValues = iter->GetValues(); - - blockBuilder->AddRow(rowKey, rowValues); - - rows++; - bytes = blockBuilder->Bytes(); - - if (rows >= RowsLimit || bytes >= BytesLimit) - break; - } - - // We don't want to do many restarts if pages weren't precharged - // So we just return whatever we read so far and the client can request more rows - if (iter->Last() == NTable::EReady::Page && rows < 1000 && bytes < 100000 && Restarts < 1) { - ++Restarts; - return false; - } - - if (iter->Last() == NTable::EReady::Gone) { - shardFinished = true; - lastKeySerialized = tableInfo.Range.To.GetBuffer(); - lastKeyInclusive = tableInfo.Range.ToInclusive; + if (snapshotKey) { + if (!Self->GetSnapshotManager().AcquireReference(*snapshotKey)) { + SetError(NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST, + TStringBuilder() << "Table id " << tableId << " has no snapshot at " << ReadVersion + << " shard " << Self->TabletID() << (Self->IsFollower() ? " RO replica" : "")); + return true; } - - TString buffer = blockBuilder->Finish(); - buffer.resize(blockBuilder->Bytes()); - - Result->Record.SetBlocks(buffer); - Result->Record.SetLastKey(lastKeySerialized); - Result->Record.SetLastKeyInclusive(lastKeyInclusive); - Result->Record.SetEndOfShard(shardFinished); } - Self->IncCounter(COUNTER_READ_COLUMNS_ROWS, rows); - Self->IncCounter(COUNTER_READ_COLUMNS_BYTES, bytes); - - LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() - << " Read columns result for table [" << tableInfo.Path << "]: " - << rows << " rows, " << bytes << " bytes (event size " - << Result->Record.GetBlocks().size() << ") shardFinished: " << shardFinished); + auto* scan = new TReadColumnsScan(TKeyBoundary{fromKeyCells, InclusiveFrom}, + TKeyBoundary{toKeyCells, InclusiveTo}, + valueColumns, valueColumnTypes, + std::move(blockBuilder), RowsLimit, BytesLimit, + TKeyBoundary{tableInfo.Range.To, tableInfo.Range.ToInclusive}, + Ev->Sender, ctx.SelfID, + snapshotKey, + tableInfo.Path, + Self->TabletID()); + auto opts = TScanOptions() + .SetResourceBroker("scan", 10) + .SetSnapshotRowVersion(ReadVersion) + .SetActorPoolId(AppData(ctx)->BatchPoolId) + .SetReadAhead(512*1024, 1024*1024) + .SetReadPrio(TScanOptions::EReadPrio::Low); + + ui64 cookie = -1; // Should be ignored + Self->QueueScan(localTableId, scan, cookie, opts); + + Result.Destroy(); // Scan is now responsible for sending the result return true; } diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 872410ac2c18..bc3e8a3e4333 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -2631,9 +2631,6 @@ class TDataShard TControlWrapper DataTxProfileBufferThresholdMs; TControlWrapper DataTxProfileBufferSize; - TControlWrapper ReadColumnsScanEnabled; - TControlWrapper ReadColumnsScanInUserPool; - TControlWrapper BackupReadAheadLo; TControlWrapper BackupReadAheadHi;