Skip to content

Commit a0549a5

Browse files
alexv-smirnovsnaury
authored andcommitted
Fix readrows RPC (#14817) (#15773)
1 parent 44ae19d commit a0549a5

File tree

4 files changed

+155
-15
lines changed

4 files changed

+155
-15
lines changed

ydb/core/grpc_services/rpc_read_rows.cpp

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@ struct RequestedKeyColumn {
3737
Ydb::Type Type;
3838
};
3939

40+
struct TShardReadState {
41+
std::vector<TOwnedCellVec> Keys;
42+
ui32 FirstUnprocessedQuery = 0;
43+
};
44+
4045
}
4146

4247
namespace {
@@ -445,7 +450,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
445450
return (cmp < 0);
446451
});
447452
Y_ABORT_UNLESS(it != partitions.end());
448-
ShardIdToKeys[it->ShardId].emplace_back(std::move(key));
453+
ShardIdToReadState[it->ShardId].Keys.emplace_back(std::move(key));
449454
}
450455
}
451456

@@ -462,12 +467,13 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
462467
auto keyRange = resolvePartitionsResult->ResultSet[0].KeyDescription.Get();
463468

464469
CreateShardToKeysMapping(keyRange);
465-
for (const auto& [shardId, keys] : ShardIdToKeys) {
466-
SendRead(shardId, keys);
470+
for (const auto& [shardId, state] : ShardIdToReadState) {
471+
SendRead(shardId, state);
467472
}
468473
}
469474

470-
void SendRead(ui64 shardId, const std::vector<TOwnedCellVec>& keys) {
475+
void SendRead(ui64 shardId, const TShardReadState& readState) {
476+
auto& keys = readState.Keys;
471477
auto request = std::make_unique<TEvDataShard::TEvRead>();
472478
auto& record = request->Record;
473479

@@ -482,8 +488,8 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
482488

483489
record.SetResultFormat(::NKikimrDataEvents::FORMAT_CELLVEC);
484490

485-
for (auto& key : keys) {
486-
request->Keys.emplace_back(TSerializedCellVec::Serialize(key));
491+
for (size_t i = readState.FirstUnprocessedQuery; i < keys.size(); ++i) {
492+
request->Keys.emplace_back(TSerializedCellVec::Serialize(keys[i]));
487493
}
488494

489495
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC send TEvRead shardId : " << shardId << " keys.size(): " << keys.size());
@@ -500,7 +506,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
500506
// ReadRows can reply with the following statuses:
501507
// * SUCCESS
502508
// * INTERNAL_ERROR -- only if MaxRetries is reached
503-
// * OVERLOADED -- client will retrie it with backoff
509+
// * OVERLOADED -- client will retry it with backoff
504510
// * ABORTED -- code is used for all other DataShard errors
505511

506512
const auto& status = msg->Record.GetStatus();
@@ -509,17 +515,20 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
509515

510516
ui64 shardId = msg->Record.GetReadId();
511517

518+
auto it = ShardIdToReadState.find(shardId);
519+
if (it == ShardIdToReadState.end()) {
520+
TStringStream ss;
521+
ss << "Got unknown shardId from TEvReadResult# " << shardId << ", status# " << statusCode;
522+
ReplyWithError(statusCode, ss.Str(), &issues);
523+
return;
524+
}
525+
512526
switch (statusCode) {
513527
case Ydb::StatusIds::SUCCESS:
514528
break;
515529
case Ydb::StatusIds::INTERNAL_ERROR: {
516-
auto it = ShardIdToKeys.find(shardId);
517530
++Retries;
518-
if (it == ShardIdToKeys.end()) {
519-
TStringStream ss;
520-
ss << "Got unknown shardId from TEvReadResult# " << shardId << ", status# " << statusCode;
521-
ReplyWithError(statusCode, ss.Str(), &issues);
522-
} else if (Retries < MaxTotalRetries) {
531+
if (Retries < MaxTotalRetries) {
523532
TStringStream ss;
524533
ss << "Reached MaxRetries count for DataShard# " << shardId << ", status# " << statusCode;
525534
ReplyWithError(statusCode, ss.Str(), &issues);
@@ -540,8 +549,21 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
540549
return;
541550
}
542551
}
552+
if (!msg->Record.HasFinished() || !msg->Record.GetFinished()) {
553+
// We should have received continuation token if read is not finished.
554+
TMaybe<TString> continuationToken = msg->Record.GetContinuationToken();
555+
556+
Y_ABORT_UNLESS(continuationToken);
557+
558+
NKikimrTxDataShard::TReadContinuationToken token;
559+
Y_ABORT_UNLESS(token.ParseFromString(*continuationToken), "Failed to parse continuation token");
560+
561+
// Save continuation token in case we will have to retry on error, but for now
562+
// we just wait for the next batch of results.
563+
it->second.FirstUnprocessedQuery = token.GetFirstUnprocessedQuery();
564+
ReadsInFlight++;
565+
}
543566
}
544-
Y_ABORT_UNLESS(msg->Record.HasFinished() && msg->Record.GetFinished());
545567
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC TEvReadResult RowsCount: " << msg->GetRowsCount());
546568

547569
EvReadResults.emplace_back(ev->Release().Release());
@@ -761,7 +783,7 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
761783
};
762784
TVector<TColumnMeta> RequestedColumnsMeta;
763785

764-
std::map<ui64, std::vector<TOwnedCellVec>> ShardIdToKeys;
786+
std::map<ui64, TShardReadState> ShardIdToReadState;
765787
std::vector<std::unique_ptr<TEvDataShard::TEvReadResult>> EvReadResults;
766788
// TEvRead interface
767789
ui64 ReadsInFlight = 0;

ydb/core/kqp/ut/common/kqp_ut_common.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "kqp_ut_common.h"
22

33
#include <ydb/core/base/backtrace.h>
4+
#include <ydb/core/tx/datashard/datashard.h>
45
#include <ydb/core/tx/schemeshard/schemeshard.h>
56
#include <ydb/core/kqp/counters/kqp_counters.h>
67
#include <ydb/core/kqp/provider/yql_kikimr_results.h>
@@ -134,6 +135,7 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) {
134135
ServerSettings->SetEnableTablePgTypes(true);
135136
ServerSettings->SetEnablePgSyntax(true);
136137
ServerSettings->S3ActorsFactory = settings.S3ActorsFactory;
138+
ServerSettings->Controls = settings.Controls;
137139
ServerSettings->SetEnableForceFollowers(settings.EnableForceFollowers);
138140

139141
if (!settings.FeatureFlags.HasEnableOlapCompression()) {
@@ -1470,6 +1472,34 @@ void WaitForZeroReadIterators(Tests::TServer& server, const TString& path) {
14701472
UNIT_ASSERT_C(iterators == 0, "Unable to wait for proper read iterator count, it looks like cancelation doesn`t work (" << iterators << ")");
14711473
}
14721474

1475+
TTableId ResolveTableId(Tests::TServer* server, TActorId sender, const TString& path) {
1476+
auto response = Navigate(*server->GetRuntime(), sender, path, NSchemeCache::TSchemeCacheNavigate::EOp::OpUnknown);
1477+
return response->ResultSet.at(0).TableId;
1478+
}
1479+
1480+
NKikimrTxDataShard::TEvCompactTableResult CompactTable(
1481+
Tests::TServer* server, ui64 shardId, const TTableId& tableId, bool compactBorrowed)
1482+
{
1483+
TTestActorRuntime* runtime = server->GetRuntime();
1484+
auto sender = runtime->AllocateEdgeActor();
1485+
auto request = MakeHolder<TEvDataShard::TEvCompactTable>(tableId.PathId);
1486+
request->Record.SetCompactBorrowed(compactBorrowed);
1487+
runtime->SendToPipe(shardId, sender, request.Release(), 0, GetPipeConfigWithRetries());
1488+
1489+
auto ev = runtime->GrabEdgeEventRethrow<TEvDataShard::TEvCompactTableResult>(sender);
1490+
return ev->Get()->Record;
1491+
}
1492+
1493+
void WaitForCompaction(Tests::TServer* server, const TString& path, bool compactBorrowed) {
1494+
TTestActorRuntime* runtime = server->GetRuntime();
1495+
auto sender = runtime->AllocateEdgeActor();
1496+
auto shards = GetTableShards(server, sender, path);
1497+
auto tableId = ResolveTableId(server, sender, path);
1498+
for (auto shard : shards) {
1499+
CompactTable(server, shard, tableId, compactBorrowed);
1500+
}
1501+
}
1502+
14731503
NJson::TJsonValue SimplifyPlan(NJson::TJsonValue& opt, const TGetPlanParams& params) {
14741504
if (auto ops = opt.GetMapSafe().find("Operators"); ops != opt.GetMapSafe().end()) {
14751505
auto opName = ops->second.GetArraySafe()[0].GetMapSafe().at("Name").GetStringSafe();

ydb/core/kqp/ut/common/kqp_ut_common.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
8989
NKqp::IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory = std::make_shared<NKqp::TKqpFederatedQuerySetupFactoryNoop>();
9090
NMonitoring::TDynamicCounterPtr CountersRoot = MakeIntrusive<NMonitoring::TDynamicCounters>();
9191
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory = NYql::NDq::CreateDefaultS3ActorsFactory();
92+
NKikimrConfig::TImmediateControlsConfig Controls;
9293

9394
TKikimrSettings()
9495
{
@@ -122,6 +123,7 @@ struct TKikimrSettings: public TTestFeatureFlagsHolder<TKikimrSettings> {
122123
TKikimrSettings& SetUseRealThreads(bool value) { UseRealThreads = value; return *this; };
123124
TKikimrSettings& SetEnableForceFollowers(bool value) { EnableForceFollowers = value; return *this; };
124125
TKikimrSettings& SetS3ActorsFactory(std::shared_ptr<NYql::NDq::IS3ActorsFactory> value) { S3ActorsFactory = std::move(value); return *this; };
126+
TKikimrSettings& SetControls(const NKikimrConfig::TImmediateControlsConfig& value) { Controls = value; return *this; }
125127
TKikimrSettings& SetColumnShardAlterObjectEnabled(bool enable) {
126128
AppConfig.MutableColumnShardConfig()->SetAlterObjectEnabled(enable);
127129
return *this;
@@ -383,6 +385,8 @@ TVector<ui64> GetColumnTableShards(Tests::TServer* server, TActorId sender, cons
383385
void WaitForZeroSessions(const NKqp::TKqpCounters& counters);
384386
void WaitForZeroReadIterators(Tests::TServer& server, const TString& path);
385387

388+
void WaitForCompaction(Tests::TServer* server, const TString& path, bool compactBorrowed = false);
389+
386390
bool JoinOrderAndAlgosMatch(const TString& optimized, const TString& reference);
387391

388392
struct TGetPlanParams {

ydb/core/kqp/ut/opt/kqp_kv_ut.cpp

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
22

3+
#include <ydb/core/tx/datashard/datashard.h>
34
#include <yql/essentials/parser/pg_catalog/catalog.h>
45
#include <yql/essentials/parser/pg_wrapper/interface/codec.h>
56
#include <yql/essentials/utils/log/log.h>
@@ -366,6 +367,89 @@ Y_UNIT_TEST_SUITE(KqpKv) {
366367
CompareYson(Sprintf("[[[%du];[%du]]]", valueToReturn_1, valueToReturn_2), TString{res});
367368
}
368369

370+
Y_UNIT_TEST_TWIN(ReadRows_ExternalBlobs, NewPrecharge) {
371+
NKikimrConfig::TImmediateControlsConfig controls;
372+
373+
if (NewPrecharge) {
374+
controls.MutableDataShardControls()->SetReadIteratorKeysExtBlobsPrecharge(1);
375+
}
376+
377+
NKikimrConfig::TFeatureFlags flags;
378+
flags.SetEnablePublicApiExternalBlobs(true);
379+
auto settings = TKikimrSettings()
380+
.SetFeatureFlags(flags)
381+
.SetWithSampleTables(false)
382+
.SetControls(controls);
383+
auto kikimr = TKikimrRunner{settings};
384+
385+
auto db = kikimr.GetTableClient();
386+
auto session = db.CreateSession().GetValueSync().GetSession();
387+
const auto tableName = "/Root/TestTable";
388+
const auto keyColumnName_1 = "blob_id";
389+
const auto keyColumnName_2 = "chunk_num";
390+
const auto dataColumnName = "data";
391+
392+
TTableBuilder builder;
393+
builder
394+
.BeginStorageSettings()
395+
.SetExternal("test")
396+
.SetStoreExternalBlobs(true)
397+
.EndStorageSettings();
398+
builder.AddNonNullableColumn(keyColumnName_1, EPrimitiveType::Uuid);
399+
builder.AddNonNullableColumn(keyColumnName_2, EPrimitiveType::Int32);
400+
builder.SetPrimaryKeyColumns({keyColumnName_1, keyColumnName_2});
401+
builder.AddNullableColumn(dataColumnName, EPrimitiveType::String);
402+
403+
auto result = session.CreateTable(tableName, builder.Build()).GetValueSync();
404+
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
405+
406+
TString largeValue(1_MB, 'L');
407+
408+
NYdb::TValueBuilder rows;
409+
rows.BeginList();
410+
for (int i = 0; i < 10; i++) {
411+
rows.AddListItem()
412+
.BeginStruct()
413+
.AddMember(keyColumnName_1).Uuid(NYdb::TUuidValue("65df1ec1-a97d-47b2-ae56-3c023da6ee8c"))
414+
.AddMember(keyColumnName_2).Int32(i)
415+
.AddMember(dataColumnName).String(largeValue)
416+
.EndStruct();
417+
}
418+
rows.EndList();
419+
420+
auto upsertResult = db.BulkUpsert(tableName, rows.Build()).GetValueSync();
421+
UNIT_ASSERT_C(upsertResult.IsSuccess(), upsertResult.GetIssues().ToString());
422+
423+
NYdb::TValueBuilder keys;
424+
keys.BeginList();
425+
for (int i = 0; i < 10; i++) {
426+
keys.AddListItem()
427+
.BeginStruct()
428+
.AddMember(keyColumnName_1).Uuid(NYdb::TUuidValue("65df1ec1-a97d-47b2-ae56-3c023da6ee8c"))
429+
.AddMember(keyColumnName_2).Int32(i)
430+
.EndStruct();
431+
}
432+
keys.EndList();
433+
434+
auto server = &kikimr.GetTestServer();
435+
436+
WaitForCompaction(server, tableName);
437+
438+
auto selectResult = db.ReadRows(tableName, keys.Build()).GetValueSync();
439+
440+
UNIT_ASSERT_C(selectResult.IsSuccess(), selectResult.GetIssues().ToString());
441+
442+
TResultSetParser parser{selectResult.GetResultSet()};
443+
UNIT_ASSERT_VALUES_EQUAL(parser.RowsCount(), 10);
444+
445+
UNIT_ASSERT(parser.TryNextRow());
446+
447+
auto val = parser.GetValue(0);
448+
TValueParser valParser(val);
449+
TUuidValue v = valParser.GetUuid();
450+
Cout << v.ToString() << Endl;
451+
}
452+
369453
TVector<::ReadRowsPgParam> readRowsPgParams
370454
{
371455
{.TypeId = BOOLOID, .TypeMod={}, .ValueContent="t"},

0 commit comments

Comments
 (0)