diff --git a/ydb/core/grpc_services/rpc_read_rows.cpp b/ydb/core/grpc_services/rpc_read_rows.cpp index bc08cf09a64d..473691670212 100644 --- a/ydb/core/grpc_services/rpc_read_rows.cpp +++ b/ydb/core/grpc_services/rpc_read_rows.cpp @@ -37,6 +37,11 @@ struct RequestedKeyColumn { Ydb::Type Type; }; +struct TShardReadState { + std::vector Keys; + ui32 FirstUnprocessedQuery = 0; +}; + } namespace { @@ -445,7 +450,7 @@ class TReadRowsRPC : public TActorBootstrapped { return (cmp < 0); }); Y_ABORT_UNLESS(it != partitions.end()); - ShardIdToKeys[it->ShardId].emplace_back(std::move(key)); + ShardIdToReadState[it->ShardId].Keys.emplace_back(std::move(key)); } } @@ -462,12 +467,13 @@ class TReadRowsRPC : public TActorBootstrapped { auto keyRange = resolvePartitionsResult->ResultSet[0].KeyDescription.Get(); CreateShardToKeysMapping(keyRange); - for (const auto& [shardId, keys] : ShardIdToKeys) { - SendRead(shardId, keys); + for (const auto& [shardId, state] : ShardIdToReadState) { + SendRead(shardId, state); } } - void SendRead(ui64 shardId, const std::vector& keys) { + void SendRead(ui64 shardId, const TShardReadState& readState) { + auto& keys = readState.Keys; auto request = std::make_unique(); auto& record = request->Record; @@ -482,8 +488,8 @@ class TReadRowsRPC : public TActorBootstrapped { record.SetResultFormat(::NKikimrDataEvents::FORMAT_CELLVEC); - for (auto& key : keys) { - request->Keys.emplace_back(TSerializedCellVec::Serialize(key)); + for (size_t i = readState.FirstUnprocessedQuery; i < keys.size(); ++i) { + request->Keys.emplace_back(TSerializedCellVec::Serialize(keys[i])); } LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC send TEvRead shardId : " << shardId << " keys.size(): " << keys.size()); @@ -500,7 +506,7 @@ class TReadRowsRPC : public TActorBootstrapped { // ReadRows can reply with the following statuses: // * SUCCESS // * INTERNAL_ERROR -- only if MaxRetries is reached - // * OVERLOADED -- client will retrie it with backoff + // * OVERLOADED -- client will retry it with backoff // * ABORTED -- code is used for all other DataShard errors const auto& status = msg->Record.GetStatus(); @@ -509,17 +515,20 @@ class TReadRowsRPC : public TActorBootstrapped { ui64 shardId = msg->Record.GetReadId(); + auto it = ShardIdToReadState.find(shardId); + if (it == ShardIdToReadState.end()) { + TStringStream ss; + ss << "Got unknown shardId from TEvReadResult# " << shardId << ", status# " << statusCode; + ReplyWithError(statusCode, ss.Str(), &issues); + return; + } + switch (statusCode) { case Ydb::StatusIds::SUCCESS: break; case Ydb::StatusIds::INTERNAL_ERROR: { - auto it = ShardIdToKeys.find(shardId); ++Retries; - if (it == ShardIdToKeys.end()) { - TStringStream ss; - ss << "Got unknown shardId from TEvReadResult# " << shardId << ", status# " << statusCode; - ReplyWithError(statusCode, ss.Str(), &issues); - } else if (Retries < MaxTotalRetries) { + if (Retries < MaxTotalRetries) { TStringStream ss; ss << "Reached MaxRetries count for DataShard# " << shardId << ", status# " << statusCode; ReplyWithError(statusCode, ss.Str(), &issues); @@ -540,8 +549,21 @@ class TReadRowsRPC : public TActorBootstrapped { return; } } + if (!msg->Record.HasFinished() || !msg->Record.GetFinished()) { + // We should have received continuation token if read is not finished. + TMaybe continuationToken = msg->Record.GetContinuationToken(); + + Y_ABORT_UNLESS(continuationToken); + + NKikimrTxDataShard::TReadContinuationToken token; + Y_ABORT_UNLESS(token.ParseFromString(*continuationToken), "Failed to parse continuation token"); + + // Save continuation token in case we will have to retry on error, but for now + // we just wait for the next batch of results. + it->second.FirstUnprocessedQuery = token.GetFirstUnprocessedQuery(); + ReadsInFlight++; + } } - Y_ABORT_UNLESS(msg->Record.HasFinished() && msg->Record.GetFinished()); LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC TEvReadResult RowsCount: " << msg->GetRowsCount()); EvReadResults.emplace_back(ev->Release().Release()); @@ -761,7 +783,7 @@ class TReadRowsRPC : public TActorBootstrapped { }; TVector RequestedColumnsMeta; - std::map> ShardIdToKeys; + std::map ShardIdToReadState; std::vector> EvReadResults; // TEvRead interface ui64 ReadsInFlight = 0;