Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 87 additions & 47 deletions ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,15 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
}
}

auto affectedShards = Reads.AffectedShards();
// TODO: use evread statistics after KIKIMR-16924
tableStats->SetReadRows(tableStats->GetReadRows() + rowsReadEstimate);
tableStats->SetReadBytes(tableStats->GetReadBytes() + bytesReadEstimate);
tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + ReadsPerShard.size());
tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + affectedShards.size());

NKqpProto::TKqpTableExtraStats tableExtraStats;
auto readActorTableAggrExtraStats = tableExtraStats.MutableReadActorTableAggrExtraStats();
for (const auto& [shardId, _] : ReadsPerShard) {
for (const auto& shardId : affectedShards) {
readActorTableAggrExtraStats->AddAffectedShards(shardId);
}

Expand Down Expand Up @@ -140,10 +141,6 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
, ShardId(shardId)
, State(EReadState::Initial) {}

void SetFinished() {
State = EReadState::Finished;
}

bool Finished() const {
return (State == EReadState::Finished);
}
Expand All @@ -162,7 +159,68 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku

struct TShardState {
ui64 RetryAttempts = 0;
std::vector<TReadState*> Reads;
std::unordered_set<ui64> Reads;
};

struct TReads {
std::unordered_map<ui64, TReadState> Reads;
std::unordered_map<ui64, TShardState> ReadsPerShard;

std::unordered_map<ui64, TReadState>::iterator begin() { return Reads.begin(); }

std::unordered_map<ui64, TReadState>::iterator end() { return Reads.end(); }

std::unordered_map<ui64, TReadState>::iterator find(ui64 readId) {
return Reads.find(readId);
}

void insert(TReadState&& read) {
const auto [readIt, succeeded] = Reads.insert({read.Id, std::move(read)});
YQL_ENSURE(succeeded);
ReadsPerShard[readIt->second.ShardId].Reads.emplace(readIt->second.Id);
}

size_t InFlightReads() const {
return Reads.size();
}

std::vector<ui64> AffectedShards() const {
std::vector<ui64> result;
result.reserve(ReadsPerShard.size());
for(const auto& [shard, _]: ReadsPerShard) {
result.push_back(shard);
}
return result;
}

bool CheckShardRetriesExeeded(TReadState& failedRead) {
const auto& shardState = ReadsPerShard[failedRead.ShardId];
return shardState.RetryAttempts + 1 > MaxShardRetries();
}

TDuration CalcDelayForShard(TReadState& failedRead, bool allowInstantRetry) {
auto& shardState = ReadsPerShard[failedRead.ShardId];
++shardState.RetryAttempts;
return CalcDelay(shardState.RetryAttempts, allowInstantRetry);
}

void erase(TReadState& read) {
ReadsPerShard[read.ShardId].Reads.erase(read.Id);
Reads.erase(read.Id);
}

std::vector<TReadState*> GetShardReads(ui64 shardId) {
auto it = ReadsPerShard.find(shardId);
YQL_ENSURE(it != ReadsPerShard.end());
std::vector<TReadState*> result;
for(ui64 readId: it->second.Reads) {
auto it = Reads.find(readId);
YQL_ENSURE(it != Reads.end());
result.push_back(&it->second);
}

return result;
}
};

struct TEvPrivate {
Expand Down Expand Up @@ -227,13 +285,15 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
ReadRowsCount += replyResultStats.ReadRowsCount;
ReadBytesCount += replyResultStats.ReadBytesCount;

auto status = FetchInputRows();
if (!StreamLookupWorker->IsOverloaded()) {
FetchInputRows();
}

if (Partitioning) {
ProcessInputRows();
}

const bool inputRowsFinished = status == NUdf::EFetchStatus::Finish;
const bool inputRowsFinished = LastFetchStatus == NUdf::EFetchStatus::Finish;
const bool allReadsFinished = AllReadsFinished();
const bool allRowsProcessed = StreamLookupWorker->AllRowsProcessed();

Expand Down Expand Up @@ -308,14 +368,14 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
void Handle(TEvDataShard::TEvReadResult::TPtr& ev) {
const auto& record = ev->Get()->Record;


auto readIt = Reads.find(record.GetReadId());
if (readIt == Reads.end() || readIt->second.State != EReadState::Running) {
CA_LOG_D("Drop read with readId: " << record.GetReadId() << ", because it's already completed or blocked");
return;
}

auto& read = readIt->second;
ui64 shardId = read.ShardId;

CA_LOG_D("Recv TEvReadResult (stream lookup) from ShardID=" << read.ShardId
<< ", Table = " << StreamLookupWorker->GetTablePath()
Expand Down Expand Up @@ -380,13 +440,13 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
case Ydb::StatusIds::NOT_FOUND:
{
StreamLookupWorker->ResetRowsProcessing(read.Id, read.FirstUnprocessedQuery, read.LastProcessedKey);
read.SetFinished();
CA_LOG_D("NOT_FOUND was received from tablet: " << read.ShardId << ". "
<< getIssues().ToOneLineString());
Reads.erase(read);
return ResolveTableShards();
}
case Ydb::StatusIds::OVERLOADED: {
if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(read)) {
if (CheckTotalRetriesExeeded() || Reads.CheckShardRetriesExeeded(read)) {
return replyError(
TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded.",
NYql::NDqProto::StatusIds::OVERLOADED);
Expand All @@ -397,7 +457,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
return RetryTableRead(read, /*allowInstantRetry = */false);
}
case Ydb::StatusIds::INTERNAL_ERROR: {
if (CheckTotalRetriesExeeded() || CheckShardRetriesExeeded(read)) {
if (CheckTotalRetriesExeeded() || Reads.CheckShardRetriesExeeded(read)) {
return replyError(
TStringBuilder() << "Table '" << StreamLookupWorker->GetTablePath() << "' retry limit exceeded.",
NYql::NDqProto::StatusIds::INTERNAL_ERROR);
Expand All @@ -416,7 +476,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
read.LastSeqNo = record.GetSeqNo();

if (record.GetFinished()) {
read.SetFinished();
Reads.erase(read);
} else {
YQL_ENSURE(record.HasContinuationToken(), "Successful TEvReadResult should contain continuation token");
NKikimrTxDataShard::TReadContinuationToken continuationToken;
Expand Down Expand Up @@ -454,7 +514,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku

auto guard = BindAllocator();
StreamLookupWorker->AddResult(TKqpStreamLookupWorker::TShardReadResult{
read.ShardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())
shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())
});
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
Expand All @@ -463,11 +523,9 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
CA_LOG_D("TEvDeliveryProblem was received from tablet: " << ev->Get()->TabletId);

const auto& tabletId = ev->Get()->TabletId;
auto shardIt = ReadsPerShard.find(tabletId);
YQL_ENSURE(shardIt != ReadsPerShard.end());

TVector<TReadState*> toRetry;
for (auto* read : shardIt->second.Reads) {
for (auto* read : Reads.GetShardReads(tabletId)) {
if (read->State == EReadState::Running) {
Counters->IteratorDeliveryProblems->Inc();
toRetry.push_back(read);
Expand Down Expand Up @@ -500,27 +558,24 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku

if ((read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) || read.State == EReadState::Blocked) {
if (ev->Get()->InstantStart) {
read.SetFinished();
auto requests = StreamLookupWorker->RebuildRequest(read.Id, read.FirstUnprocessedQuery, read.LastProcessedKey, ReadId);
for (auto& request : requests) {
StartTableRead(read.ShardId, std::move(request));
}
Reads.erase(read);
} else {
RetryTableRead(read);
}
}
}

NUdf::EFetchStatus FetchInputRows() {
void FetchInputRows() {
auto guard = BindAllocator();

NUdf::EFetchStatus status;
NUdf::TUnboxedValue row;
while ((status = Input.Fetch(row)) == NUdf::EFetchStatus::Ok) {
while ((LastFetchStatus = Input.Fetch(row)) == NUdf::EFetchStatus::Ok) {
Copy link

Copilot AI May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider updating the documentation or comments for FetchInputRows to reflect its revised behavior of setting LastFetchStatus instead of returning a fetch status.

Copilot uses AI. Check for mistakes.
StreamLookupWorker->AddInputRow(std::move(row));
}

return status;
}

void ProcessInputRows() {
Expand Down Expand Up @@ -580,9 +635,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku

auto readId = read.Id;
auto lastSeqNo = read.LastSeqNo;
const auto [readIt, succeeded] = Reads.insert({readId, std::move(read)});
YQL_ENSURE(succeeded);
ReadsPerShard[shardId].Reads.push_back(&readIt->second);
Reads.insert(std::move(read));

if (auto delay = ShardTimeout()) {
TlsActivationContext->Schedule(
Expand All @@ -596,11 +649,6 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
return limit && TotalRetryAttempts + 1 > *limit;
}

bool CheckShardRetriesExeeded(TReadState& failedRead) {
const auto& shardState = ReadsPerShard[failedRead.ShardId];
return shardState.RetryAttempts + 1 > MaxShardRetries();
}

void RetryTableRead(TReadState& failedRead, bool allowInstantRetry = true) {
CA_LOG_D("Retry reading of table: " << StreamLookupWorker->GetTablePath() << ", readId: " << failedRead.Id
<< ", shardId: " << failedRead.ShardId);
Expand All @@ -611,21 +659,19 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
}
++TotalRetryAttempts;

if (CheckShardRetriesExeeded(failedRead)) {
if (Reads.CheckShardRetriesExeeded(failedRead)) {
StreamLookupWorker->ResetRowsProcessing(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey);
failedRead.SetFinished();
Reads.erase(failedRead);
return ResolveTableShards();
}
auto& shardState = ReadsPerShard[failedRead.ShardId];
++shardState.RetryAttempts;

auto delay = CalcDelay(shardState.RetryAttempts, allowInstantRetry);
auto delay = Reads.CalcDelayForShard(failedRead, allowInstantRetry);
if (delay == TDuration::Zero()) {
failedRead.SetFinished();
auto requests = StreamLookupWorker->RebuildRequest(failedRead.Id, failedRead.FirstUnprocessedQuery, failedRead.LastProcessedKey, ReadId);
for (auto& request : requests) {
StartTableRead(failedRead.ShardId, std::move(request));
}
Reads.erase(failedRead);
} else {
CA_LOG_D("Schedule retry atempt for readId: " << failedRead.Id << " after " << delay);
TlsActivationContext->Schedule(
Expand Down Expand Up @@ -671,13 +717,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
}

bool AllReadsFinished() const {
for (const auto& [_, read] : Reads) {
if (!read.Finished()) {
return false;
}
}

return true;
return Reads.InFlightReads() == 0;
}

TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator() {
Expand Down Expand Up @@ -715,8 +755,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
const TMaybe<ui64> LockTxId;
const TMaybe<ui32> NodeLockId;
const TMaybe<NKikimrDataEvents::ELockMode> LockMode;
std::unordered_map<ui64, TReadState> Reads;
std::unordered_map<ui64, TShardState> ReadsPerShard;
TReads Reads;
NUdf::EFetchStatus LastFetchStatus = NUdf::EFetchStatus::Yield;
std::shared_ptr<const TVector<TKeyDesc::TPartitionInfo>> Partitioning;
const TDuration SchemeCacheRequestTimeout;
NActors::TActorId SchemeCacheRequestTimeoutTimer;
Expand Down
22 changes: 15 additions & 7 deletions ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
namespace NKikimr {
namespace NKqp {

constexpr ui64 MAX_IN_FLIGHT_LIMIT = 500;

namespace {
std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpStreamLookupWorker::TPartitionInfo& partitionInfo,
const std::vector<NScheme::TTypeInfo>& keyColumnTypes, const TOwnedTableRange& range) {
Expand Down Expand Up @@ -286,12 +288,15 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
ReadResults.emplace_back(std::move(result));
}

bool IsOverloaded() final {
return false;
}

TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
TReadResultStats resultStats;
bool sizeLimitExceeded = false;
batch.clear();

while (!ReadResults.empty() && !sizeLimitExceeded) {
while (!ReadResults.empty() && !resultStats.SizeLimitExceeded) {
auto& result = ReadResults.front();
for (; result.UnprocessedResultRow < result.ReadResult->Get()->GetRowsCount(); ++result.UnprocessedResultRow) {
const auto& resultRow = result.ReadResult->Get()->GetCells(result.UnprocessedResultRow);
Expand All @@ -317,10 +322,10 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
}

if (rowSize + (i64)resultStats.ResultBytesCount > freeSpace) {
sizeLimitExceeded = true;
resultStats.SizeLimitExceeded = true;
}

if (resultStats.ResultRowsCount && sizeLimitExceeded) {
if (resultStats.ResultRowsCount && resultStats.SizeLimitExceeded) {
row.DeleteUnreferenced();
break;
}
Expand Down Expand Up @@ -456,6 +461,10 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
UnprocessedRows.emplace_back(std::make_pair(TOwnedCellVec(joinKeyCells), std::move(inputRow.GetElement(1))));
}

bool IsOverloaded() final {
return UnprocessedRows.size() >= MAX_IN_FLIGHT_LIMIT || PendingLeftRowsByKey.size() >= MAX_IN_FLIGHT_LIMIT || ResultRowsBySeqNo.size() >= MAX_IN_FLIGHT_LIMIT;
}

std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery,
TMaybe<TOwnedCellVec> lastProcessedKey, ui64& newReadId) final {

Expand Down Expand Up @@ -730,7 +739,6 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {

TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
TReadResultStats resultStats;
bool sizeLimitExceeded = false;
batch.clear();

// we should process left rows that haven't matches on the right
Expand Down Expand Up @@ -759,7 +767,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
return ResultRowsBySeqNo.find(CurrentResultSeqNo);
};

while (!sizeLimitExceeded) {
while (!resultStats.SizeLimitExceeded) {
auto resultIt = getNextResult();
if (resultIt == ResultRowsBySeqNo.end()) {
break;
Expand All @@ -770,7 +778,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
auto& row = result.Rows[result.FirstUnprocessedRow];

if (resultStats.ResultRowsCount && resultStats.ResultBytesCount + row.Stats.ResultBytesCount > (ui64)freeSpace) {
sizeLimitExceeded = true;
resultStats.SizeLimitExceeded = true;
break;
}

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class TKqpStreamLookupWorker {
ui64 ReadBytesCount = 0;
ui64 ResultRowsCount = 0;
ui64 ResultBytesCount = 0;
bool SizeLimitExceeded = false;

void Add(const TReadResultStats& other) {
ReadRowsCount += other.ReadRowsCount;
Expand Down Expand Up @@ -57,13 +58,14 @@ class TKqpStreamLookupWorker {
}

virtual void AddInputRow(NUdf::TUnboxedValue inputRow) = 0;
virtual std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery,
virtual std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui32 firstUnprocessedQuery,
TMaybe<TOwnedCellVec> lastProcessedKey, ui64& newReadId) = 0;
virtual TReadList BuildRequests(const TPartitionInfo& partitioning, ui64& readId) = 0;
virtual void AddResult(TShardReadResult result) = 0;
virtual TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) = 0;
virtual bool AllRowsProcessed() = 0;
virtual void ResetRowsProcessing(ui64 readId, ui32 firstUnprocessedQuery, TMaybe<TOwnedCellVec> lastProcessedKey) = 0;
virtual bool IsOverloaded() = 0;

protected:
const NKikimrKqp::TKqpStreamLookupSettings Settings;
Expand Down
Loading