Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 37 additions & 15 deletions ydb/core/grpc_services/rpc_read_rows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ struct RequestedKeyColumn {
Ydb::Type Type;
};

struct TShardReadState {
std::vector<TOwnedCellVec> Keys;
ui32 FirstUnprocessedQuery = 0;
};

}

namespace {
Expand Down Expand Up @@ -445,7 +450,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
return (cmp < 0);
});
Y_ABORT_UNLESS(it != partitions.end());
ShardIdToKeys[it->ShardId].emplace_back(std::move(key));
ShardIdToReadState[it->ShardId].Keys.emplace_back(std::move(key));
}
}

Expand All @@ -462,12 +467,13 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
auto keyRange = resolvePartitionsResult->ResultSet[0].KeyDescription.Get();

CreateShardToKeysMapping(keyRange);
for (const auto& [shardId, keys] : ShardIdToKeys) {
SendRead(shardId, keys);
for (const auto& [shardId, state] : ShardIdToReadState) {
SendRead(shardId, state);
}
}

void SendRead(ui64 shardId, const std::vector<TOwnedCellVec>& keys) {
void SendRead(ui64 shardId, const TShardReadState& readState) {
auto& keys = readState.Keys;
auto request = std::make_unique<TEvDataShard::TEvRead>();
auto& record = request->Record;

Expand All @@ -482,8 +488,8 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {

record.SetResultFormat(::NKikimrDataEvents::FORMAT_CELLVEC);

for (auto& key : keys) {
request->Keys.emplace_back(TSerializedCellVec::Serialize(key));
for (size_t i = readState.FirstUnprocessedQuery; i < keys.size(); ++i) {
request->Keys.emplace_back(TSerializedCellVec::Serialize(keys[i]));
}

LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC send TEvRead shardId : " << shardId << " keys.size(): " << keys.size());
Expand All @@ -500,7 +506,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
// ReadRows can reply with the following statuses:
// * SUCCESS
// * INTERNAL_ERROR -- only if MaxRetries is reached
// * OVERLOADED -- client will retrie it with backoff
// * OVERLOADED -- client will retry it with backoff
// * ABORTED -- code is used for all other DataShard errors

const auto& status = msg->Record.GetStatus();
Expand All @@ -509,17 +515,20 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {

ui64 shardId = msg->Record.GetReadId();

auto it = ShardIdToReadState.find(shardId);
if (it == ShardIdToReadState.end()) {
TStringStream ss;
ss << "Got unknown shardId from TEvReadResult# " << shardId << ", status# " << statusCode;
ReplyWithError(statusCode, ss.Str(), &issues);
return;
}

switch (statusCode) {
case Ydb::StatusIds::SUCCESS:
break;
case Ydb::StatusIds::INTERNAL_ERROR: {
auto it = ShardIdToKeys.find(shardId);
++Retries;
if (it == ShardIdToKeys.end()) {
TStringStream ss;
ss << "Got unknown shardId from TEvReadResult# " << shardId << ", status# " << statusCode;
ReplyWithError(statusCode, ss.Str(), &issues);
} else if (Retries < MaxTotalRetries) {
if (Retries < MaxTotalRetries) {
TStringStream ss;
ss << "Reached MaxRetries count for DataShard# " << shardId << ", status# " << statusCode;
ReplyWithError(statusCode, ss.Str(), &issues);
Expand All @@ -540,8 +549,21 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
return;
}
}
if (!msg->Record.HasFinished() || !msg->Record.GetFinished()) {
// We should have received continuation token if read is not finished.
TMaybe<TString> continuationToken = msg->Record.GetContinuationToken();

Y_ABORT_UNLESS(continuationToken);

NKikimrTxDataShard::TReadContinuationToken token;
Y_ABORT_UNLESS(token.ParseFromString(*continuationToken), "Failed to parse continuation token");

// Save continuation token in case we will have to retry on error, but for now
// we just wait for the next batch of results.
it->second.FirstUnprocessedQuery = token.GetFirstUnprocessedQuery();
ReadsInFlight++;
}
}
Y_ABORT_UNLESS(msg->Record.HasFinished() && msg->Record.GetFinished());
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC TEvReadResult RowsCount: " << msg->GetRowsCount());

EvReadResults.emplace_back(ev->Release().Release());
Expand Down Expand Up @@ -761,7 +783,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
};
TVector<TColumnMeta> RequestedColumnsMeta;

std::map<ui64, std::vector<TOwnedCellVec>> ShardIdToKeys;
std::map<ui64, TShardReadState> ShardIdToReadState;
std::vector<std::unique_ptr<TEvDataShard::TEvReadResult>> EvReadResults;
// TEvRead interface
ui64 ReadsInFlight = 0;
Expand Down
Loading