Skip to content

Commit 7bee2dc

Browse files
author
kinash-varvara
authored
Merge branch 'ydb-platform:main' into dev
2 parents 61b2355 + 6c7989f commit 7bee2dc

File tree

28 files changed

+541
-1048
lines changed

28 files changed

+541
-1048
lines changed

ydb/apps/ydb/main.cpp

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,5 @@
1-
#include <ydb/public/lib/ydb_cli/commands/ydb_service_topic.h>
21
#include <ydb/apps/ydb/commands/ydb_cloud_root.h>
3-
4-
TVector<NYdb::NTopic::ECodec> NYdb::NConsoleClient::InitAllowedCodecs() {
5-
return TVector<NYdb::NTopic::ECodec>{
6-
NYdb::NTopic::ECodec::RAW,
7-
NYdb::NTopic::ECodec::ZSTD,
8-
NYdb::NTopic::ECodec::GZIP,
9-
};
10-
}
2+
#include <ydb/public/lib/ydb_cli/commands/ydb_common.h>
113

124
int main(int argc, char **argv) {
135
try {

ydb/core/kqp/ut/perf/kqp_workload_ut.cpp

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44

55
#include <ydb/library/workload/workload_factory.h>
66
#include <ydb/library/workload/stock_workload.h>
7+
#include <ydb/library/workload/stock_workload.h_serialized.h>
78
#include <ydb/library/workload/kv_workload.h>
9+
#include <ydb/library/workload/kv_workload.h_serialized.h>
810

911
#include <library/cpp/threading/local_executor/local_executor.h>
12+
#include <util/generic/serialized_enum.h>
1013

1114
namespace NKikimr::NKqp {
1215

@@ -28,30 +31,24 @@ void ExecuteQuery(TTableClient& db, TSession& session, NYdbWorkload::TQueryInfo&
2831
}
2932
}
3033

31-
void Test(NYdbWorkload::EWorkload workloadType) {
34+
void Test(const TString& workloadType) {
3235
auto settings = TKikimrSettings().SetWithSampleTables(false);
3336
auto kikimr = TKikimrRunner{settings};
3437
auto db = kikimr.GetTableClient();
3538
auto session = db.CreateSession().GetValueSync().GetSession();
3639

37-
std::unique_ptr<NYdbWorkload::TWorkloadParams> params;
38-
if (workloadType == NYdbWorkload::EWorkload::STOCK) {
39-
auto stockParams = std::make_unique<NYdbWorkload::TStockWorkloadParams>();
40+
auto params = NYdbWorkload::TWorkloadFactory::MakeHolder(workloadType);
41+
UNIT_ASSERT(params);
42+
if (auto* stockParams = dynamic_cast<NYdbWorkload::TStockWorkloadParams*>(params.Get())) {
4043
stockParams->ProductCount = 100;
4144
stockParams->Quantity = 1000;
4245
stockParams->OrderCount = 100;
4346
stockParams->Limit = 10;
4447
stockParams->MinPartitions = 40;
4548
stockParams->PartitionsByLoad = true;
46-
params = std::move(stockParams);
47-
} else if (workloadType == NYdbWorkload::EWorkload::KV) {
48-
params = std::make_unique<NYdbWorkload::TKvWorkloadParams>();
49-
} else {
50-
UNIT_ASSERT(false);
5149
}
52-
UNIT_ASSERT(params);
5350
params->DbPath = "/Root";
54-
auto workloadQueryGen = NYdbWorkload::TWorkloadFactory().GetWorkloadQueryGenerator(workloadType, params.get());
51+
auto workloadQueryGen = params->CreateGenerator();
5552

5653
auto result = session.ExecuteSchemeQuery(workloadQueryGen->GetDDLQueries()).GetValueSync();
5754
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
@@ -62,17 +59,17 @@ void Test(NYdbWorkload::EWorkload workloadType) {
6259
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
6360
}
6461
int maxType = 0;
65-
if (workloadType == NYdbWorkload::EWorkload::STOCK) {
66-
maxType = static_cast<int>(NYdbWorkload::TStockWorkloadGenerator::EType::MaxType);
67-
} else if (workloadType == NYdbWorkload::EWorkload::KV) {
68-
maxType = static_cast<int>(NYdbWorkload::TKvWorkloadGenerator::EType::MaxType);
62+
if (workloadType == "stock") {
63+
maxType = GetEnumItemsCount<NYdbWorkload::TStockWorkloadGenerator::EType>();
64+
} else if (workloadType == "kv") {
65+
maxType = GetEnumItemsCount<NYdbWorkload::TKvWorkloadGenerator::EType>();
6966
}
7067
for (int type = 0; type < maxType; ++type) {
7168
size_t InFlight = 10;
7269
NPar::LocalExecutor().RunAdditionalThreads(InFlight);
7370
NPar::LocalExecutor().ExecRange([&db, type, &params, workloadType](int /*id*/) {
7471
TTimer t;
75-
auto workloadQueryGen = NYdbWorkload::TWorkloadFactory().GetWorkloadQueryGenerator(workloadType, params.get());
72+
auto workloadQueryGen = params->CreateGenerator();
7673
auto session = db.CreateSession().GetValueSync().GetSession();
7774
for (size_t i = 0; i < REPEATS; ++i) {
7875
auto queriesList = workloadQueryGen->GetWorkload(type);
@@ -88,11 +85,11 @@ void Test(NYdbWorkload::EWorkload workloadType) {
8885

8986
Y_UNIT_TEST_SUITE(KqpWorkload) {
9087
Y_UNIT_TEST(STOCK) {
91-
Test(NYdbWorkload::EWorkload::STOCK);
88+
Test("stock");
9289
}
9390

9491
Y_UNIT_TEST(KV) {
95-
Test(NYdbWorkload::EWorkload::KV);
92+
Test("kv");
9693
}
9794
}
9895
}

ydb/core/load_test/kqp.cpp

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -254,30 +254,30 @@ class TKqpLoadActor : public TActorBootstrapped<TKqpLoadActor> {
254254
IncreaseSessions = cmd.GetIncreaseSessions();
255255
Total = std::make_unique<MonitoringData>();
256256

257-
NYdbWorkload::TWorkloadFactory factory;
258257
if (cmd.Workload_case() == NKikimr::TEvLoadTestRequest_TKqpLoad::WorkloadCase::kStock) {
259-
WorkloadClass = NYdbWorkload::EWorkload::STOCK;
260-
NYdbWorkload::TStockWorkloadParams params;
261-
params.PartitionsByLoad = cmd.GetStock().GetPartitionsByLoad();
262-
params.OrderCount = cmd.GetStock().GetOrderCount();
263-
params.ProductCount = cmd.GetStock().GetProductCount();
264-
params.Quantity = cmd.GetStock().GetQuantity();
265-
params.Limit = cmd.GetStock().GetLimit();
266-
params.DbPath = WorkingDir;
267-
params.MinPartitions = UniformPartitionsCount;
268-
WorkloadQueryGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, &params);
258+
WorkloadClass = "stock";
259+
auto params = std::make_shared<NYdbWorkload::TStockWorkloadParams>();
260+
params->PartitionsByLoad = cmd.GetStock().GetPartitionsByLoad();
261+
params->OrderCount = cmd.GetStock().GetOrderCount();
262+
params->ProductCount = cmd.GetStock().GetProductCount();
263+
params->Quantity = cmd.GetStock().GetQuantity();
264+
params->Limit = cmd.GetStock().GetLimit();
265+
params->DbPath = WorkingDir;
266+
params->MinPartitions = UniformPartitionsCount;
267+
WorkloadQueryGen = std::make_shared<NYdbWorkload::TStockWorkloadGenerator>(params.get());
268+
WorkloadQueryGenParams = params;
269269
} else if (cmd.Workload_case() == NKikimr::TEvLoadTestRequest_TKqpLoad::WorkloadCase::kKv) {
270-
WorkloadClass = NYdbWorkload::EWorkload::KV;
271-
NYdbWorkload::TKvWorkloadParams params;
272-
params.InitRowCount = cmd.GetKv().GetInitRowCount();
273-
params.PartitionsByLoad = cmd.GetKv().GetPartitionsByLoad();
274-
params.MaxFirstKey = cmd.GetKv().GetMaxFirstKey();
275-
params.StringLen = cmd.GetKv().GetStringLen();
276-
params.ColumnsCnt = cmd.GetKv().GetColumnsCnt();
277-
params.RowsCnt = cmd.GetKv().GetRowsCnt();
278-
params.MinPartitions = UniformPartitionsCount;
279-
params.DbPath = WorkingDir;
280-
WorkloadQueryGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, &params);
270+
WorkloadClass = "kv";
271+
auto params = std::make_shared<NYdbWorkload::TKvWorkloadParams>();
272+
params->InitRowCount = cmd.GetKv().GetInitRowCount();
273+
params->PartitionsByLoad = cmd.GetKv().GetPartitionsByLoad();
274+
params->MaxFirstKey = cmd.GetKv().GetMaxFirstKey();
275+
params->StringLen = cmd.GetKv().GetStringLen();
276+
params->ColumnsCnt = cmd.GetKv().GetColumnsCnt();
277+
params->RowsCnt = cmd.GetKv().GetRowsCnt();
278+
params->MinPartitions = UniformPartitionsCount;
279+
WorkloadQueryGen = std::make_shared<NYdbWorkload::TKvWorkloadGenerator>(params.get());
280+
WorkloadQueryGenParams = params;
281281
} else {
282282
return;
283283
}
@@ -301,8 +301,8 @@ class TKqpLoadActor : public TActorBootstrapped<TKqpLoadActor> {
301301

302302
Become(&TKqpLoadActor::StateStart);
303303

304-
if (WorkloadClass == NYdbWorkload::EWorkload::STOCK) {
305-
NYdbWorkload::TStockWorkloadParams* params = static_cast<NYdbWorkload::TStockWorkloadParams*>(WorkloadQueryGen->GetParams());
304+
if (WorkloadClass == "stock") {
305+
NYdbWorkload::TStockWorkloadParams* params = static_cast<NYdbWorkload::TStockWorkloadParams*>(WorkloadQueryGenParams.get());
306306
LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Starting load actor with workload STOCK, Params: {"
307307
<< "PartitionsByLoad: " << params->PartitionsByLoad << " "
308308
<< "OrderCount: " << params->OrderCount << " "
@@ -311,8 +311,8 @@ class TKqpLoadActor : public TActorBootstrapped<TKqpLoadActor> {
311311
<< "Limit: " << params->Limit << " "
312312
<< "DbPath: " << params->DbPath << " "
313313
<< "MinPartitions: " << params->MinPartitions);
314-
} else if (WorkloadClass == NYdbWorkload::EWorkload::KV) {
315-
NYdbWorkload::TKvWorkloadParams* params = static_cast<NYdbWorkload::TKvWorkloadParams*>(WorkloadQueryGen->GetParams());
314+
} else if (WorkloadClass == "kv") {
315+
NYdbWorkload::TKvWorkloadParams* params = static_cast<NYdbWorkload::TKvWorkloadParams*>(WorkloadQueryGenParams.get());
316316
LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Starting load actor with workload KV, Params: {"
317317
<< "InitRowCount: " << params->InitRowCount << " "
318318
<< "PartitionsByLoad: " << params->PartitionsByLoad << " "
@@ -666,14 +666,15 @@ class TKqpLoadActor : public TActorBootstrapped<TKqpLoadActor> {
666666
size_t NumOfSessions = 0;
667667
bool IncreaseSessions = false;
668668
size_t ResultsReceived = 0;
669-
NYdbWorkload::EWorkload WorkloadClass;
669+
TString WorkloadClass;
670670
NKikimrKqp::EQueryType QueryType;
671671

672672
NYdbWorkload::TQueryInfoList InitData;
673673

674674
const TActorId Parent;
675675
ui64 Tag;
676676
ui32 DurationSeconds;
677+
std::shared_ptr<NYdbWorkload::TWorkloadParams> WorkloadQueryGenParams;
677678
std::shared_ptr<NYdbWorkload::IWorkloadQueryGenerator> WorkloadQueryGen;
678679

679680
// Monitoring

ydb/library/arrow_clickhouse/Common/Allocator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
#include <cstdlib>
2323
#include <algorithm>
24-
24+
#include <stdexcept>
2525

2626
#include <common/mremap.h>
2727
//#include <common/getPageSize.h>

ydb/library/arrow_parquet/result_set_parquet_printer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include <util/system/types.h>
44

5+
#include <memory>
56
#include <string>
67

78
namespace NYdb {

ydb/library/workload/kv_workload.cpp

Lines changed: 101 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#include "kv_workload.h"
2-
#include "format"
3-
#include "util/random/random.h"
4-
2+
#include "workload_factory.h"
3+
#include <util/generic/serialized_enum.h>
4+
#include <util/random/random.h>
55
#include <util/datetime/base.h>
66

77
#include <ydb/core/util/lz4_data_generator.h>
@@ -13,15 +13,12 @@
1313
#include <random>
1414
#include <sstream>
1515
#include <chrono>
16-
17-
template <>
18-
void Out<NYdbWorkload::KvWorkloadConstants>(IOutputStream& out, NYdbWorkload::KvWorkloadConstants constant)
19-
{
20-
out << static_cast<ui64>(constant);
21-
}
16+
#include <format>
2217

2318
namespace NYdbWorkload {
2419

20+
TWorkloadFactory::TRegistrator<TKvWorkloadParams> KvRegistrar("kv");
21+
2522
using TRow = TKvWorkloadGenerator::TRow;
2623

2724
// Note: there is no mechanism to update row values for now so all keys should be different
@@ -129,7 +126,7 @@ void VerifyRows(const TRow& checkRow, const TVector<TRow>& readRows, TString mes
129126

130127

131128
TKvWorkloadGenerator::TKvWorkloadGenerator(const TKvWorkloadParams* params)
132-
: Params(*params)
129+
: TBase(params)
133130
, BigString(NKikimr::GenDataForLZ4(Params.StringLen))
134131
{
135132
if (Params.MixedChangePartitionsSize) {
@@ -142,10 +139,6 @@ TKvWorkloadGenerator::TKvWorkloadGenerator(const TKvWorkloadParams* params)
142139
Y_ABORT_UNLESS(Params.KeyColumnsCnt <= Params.ColumnsCnt);
143140
}
144141

145-
TKvWorkloadParams* TKvWorkloadGenerator::GetParams() {
146-
return &Params;
147-
}
148-
149142
std::string TKvWorkloadGenerator::GetDDLQueries() const {
150143
std::stringstream ss;
151144

@@ -198,6 +191,16 @@ TQueryInfoList TKvWorkloadGenerator::GetWorkload(int type) {
198191
}
199192

200193

194+
TVector<IWorkloadQueryGenerator::TWorkloadType> TKvWorkloadGenerator::GetSupportedWorkloadTypes() const {
195+
TVector<TWorkloadType> result;
196+
result.emplace_back(static_cast<int>(EType::UpsertRandom), "upsert", "Upsert random rows into table");
197+
result.emplace_back(static_cast<int>(EType::InsertRandom), "insert", "Insert random rows into table");
198+
result.emplace_back(static_cast<int>(EType::SelectRandom), "select", "Select rows matching primary key(s)");
199+
result.emplace_back(static_cast<int>(EType::ReadRowsRandom), "read-rows", "ReadRows rows matching primary key(s)");
200+
result.emplace_back(static_cast<int>(EType::Mixed), "mixed", "Writes and SELECT/ReadsRows rows randomly, verifies them");
201+
return result;
202+
}
203+
201204
TQueryInfoList TKvWorkloadGenerator::WriteRows(TString operation, TVector<TRow>&& rows) {
202205
std::stringstream ss;
203206

@@ -449,10 +452,8 @@ TQueryInfoList TKvWorkloadGenerator::GetInitialData() {
449452
return res;
450453
}
451454

452-
std::string TKvWorkloadGenerator::GetCleanDDLQueries() const {
453-
std::string query = "DROP TABLE `" + Params.TableName + "`;";
454-
455-
return query;
455+
TVector<std::string> TKvWorkloadGenerator::GetCleanPaths() const {
456+
return { Params.TableName };
456457
}
457458

458459
TVector<TRow> TKvWorkloadGenerator::GenerateRandomRows(bool randomValues) {
@@ -486,5 +487,86 @@ TVector<TRow> TKvWorkloadGenerator::GenerateRandomRows(bool randomValues) {
486487
return result;
487488
}
488489

490+
void TKvWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) {
491+
opts.SetFreeArgsNum(0);
492+
switch (commandType) {
493+
case TWorkloadParams::ECommandType::Init:
494+
opts.AddLongOption("init-upserts", "count of upserts need to create while table initialization")
495+
.DefaultValue((ui64)KvWorkloadConstants::INIT_ROW_COUNT).StoreResult(&InitRowCount);
496+
opts.AddLongOption("min-partitions", "Minimum partitions for tables.")
497+
.DefaultValue((ui64)KvWorkloadConstants::MIN_PARTITIONS).StoreResult(&MinPartitions);
498+
opts.AddLongOption("partition-size", "Maximum partition size in megabytes (AUTO_PARTITIONING_PARTITION_SIZE_MB).")
499+
.DefaultValue((ui64)KvWorkloadConstants::PARTITION_SIZE_MB).StoreResult(&PartitionSizeMb);
500+
opts.AddLongOption("auto-partition", "Enable auto partitioning by load.")
501+
.DefaultValue((ui64)KvWorkloadConstants::PARTITIONS_BY_LOAD).StoreResult(&PartitionsByLoad);
502+
opts.AddLongOption("max-first-key", "Maximum value of a first primary key")
503+
.DefaultValue((ui64)KvWorkloadConstants::MAX_FIRST_KEY).StoreResult(&MaxFirstKey);
504+
opts.AddLongOption("len", "String len")
505+
.DefaultValue((ui64)KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen);
506+
opts.AddLongOption("cols", "Number of columns")
507+
.DefaultValue((ui64)KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt);
508+
opts.AddLongOption("int-cols", "Number of int columns")
509+
.DefaultValue((ui64)KvWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt);
510+
opts.AddLongOption("key-cols", "Number of key columns")
511+
.DefaultValue((ui64)KvWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt);
512+
opts.AddLongOption("rows", "Number of rows")
513+
.DefaultValue((ui64)KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt);
514+
break;
515+
case TWorkloadParams::ECommandType::Run:
516+
opts.AddLongOption("max-first-key", "Maximum value of a first primary key")
517+
.DefaultValue((ui64)KvWorkloadConstants::MAX_FIRST_KEY).StoreResult(&MaxFirstKey);
518+
opts.AddLongOption("int-cols", "Number of int columns")
519+
.DefaultValue((ui64)KvWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt);
520+
opts.AddLongOption("key-cols", "Number of key columns")
521+
.DefaultValue((ui64)KvWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt);
522+
switch (static_cast<TKvWorkloadGenerator::EType>(workloadType)) {
523+
case TKvWorkloadGenerator::EType::UpsertRandom:
524+
opts.AddLongOption("len", "String len")
525+
.DefaultValue((ui64)KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen);
526+
opts.AddLongOption("cols", "Number of columns to upsert")
527+
.DefaultValue((ui64)KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt);
528+
opts.AddLongOption("rows", "Number of rows to upsert")
529+
.DefaultValue((ui64)NYdbWorkload::KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt);
530+
break;
531+
case TKvWorkloadGenerator::EType::InsertRandom:
532+
opts.AddLongOption("len", "String len")
533+
.DefaultValue((ui64)KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen);
534+
opts.AddLongOption("cols", "Number of columns to insert")
535+
.DefaultValue((ui64)KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt);
536+
opts.AddLongOption("rows", "Number of rows to insert")
537+
.DefaultValue((ui64)NYdbWorkload::KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt);
538+
break;
539+
case TKvWorkloadGenerator::EType::SelectRandom:
540+
case TKvWorkloadGenerator::EType::ReadRowsRandom:
541+
opts.AddLongOption("cols", "Number of columns to select for a single query")
542+
.DefaultValue((ui64)KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt);
543+
opts.AddLongOption("rows", "Number of rows to select for a single query")
544+
.DefaultValue((ui64)NYdbWorkload::KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt);
545+
break;
546+
case TKvWorkloadGenerator::EType::Mixed:
547+
opts.AddLongOption("len", "String len")
548+
.DefaultValue((ui64)KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen);
549+
opts.AddLongOption("cols", "Number of columns")
550+
.DefaultValue((ui64)KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt);
551+
opts.AddLongOption("change-partitions-size", "Apply random changes of AUTO_PARTITIONING_PARTITION_SIZE_MB setting")
552+
.DefaultValue((ui64)KvWorkloadConstants::MIXED_CHANGE_PARTITIONS_SIZE).StoreResult(&MixedChangePartitionsSize);
553+
opts.AddLongOption("do-select", "Do SELECT operations")
554+
.DefaultValue((ui64)KvWorkloadConstants::MIXED_DO_SELECT).StoreResult(&MixedDoSelect);
555+
opts.AddLongOption("do-read-rows", "Do ReadRows operations")
556+
.DefaultValue((ui64)KvWorkloadConstants::MIXED_DO_READ_ROWS).StoreResult(&MixedDoReadRows);
557+
}
558+
break;
559+
case TWorkloadParams::ECommandType::Clean:
560+
break;
561+
}
562+
}
489563

490-
}
564+
THolder<IWorkloadQueryGenerator> TKvWorkloadParams::CreateGenerator() const {
565+
return MakeHolder<TKvWorkloadGenerator>(this);
566+
}
567+
568+
TString TKvWorkloadParams::GetWorkloadName() const {
569+
return "Key-Value";
570+
}
571+
572+
}

0 commit comments

Comments
 (0)