diff --git a/ydb/apps/ydb/main.cpp b/ydb/apps/ydb/main.cpp index 25839e33ef6b..7d2542019238 100644 --- a/ydb/apps/ydb/main.cpp +++ b/ydb/apps/ydb/main.cpp @@ -1,13 +1,5 @@ -#include #include - -TVector NYdb::NConsoleClient::InitAllowedCodecs() { - return TVector{ - NYdb::NTopic::ECodec::RAW, - NYdb::NTopic::ECodec::ZSTD, - NYdb::NTopic::ECodec::GZIP, - }; -} +#include int main(int argc, char **argv) { try { diff --git a/ydb/core/kqp/ut/perf/kqp_workload_ut.cpp b/ydb/core/kqp/ut/perf/kqp_workload_ut.cpp index 3a312612da17..44111e1f5f3d 100644 --- a/ydb/core/kqp/ut/perf/kqp_workload_ut.cpp +++ b/ydb/core/kqp/ut/perf/kqp_workload_ut.cpp @@ -4,9 +4,12 @@ #include #include +#include #include +#include #include +#include namespace NKikimr::NKqp { @@ -28,30 +31,24 @@ void ExecuteQuery(TTableClient& db, TSession& session, NYdbWorkload::TQueryInfo& } } -void Test(NYdbWorkload::EWorkload workloadType) { +void Test(const TString& workloadType) { auto settings = TKikimrSettings().SetWithSampleTables(false); auto kikimr = TKikimrRunner{settings}; auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); - std::unique_ptr params; - if (workloadType == NYdbWorkload::EWorkload::STOCK) { - auto stockParams = std::make_unique(); + auto params = NYdbWorkload::TWorkloadFactory::MakeHolder(workloadType); + UNIT_ASSERT(params); + if (auto* stockParams = dynamic_cast(params.Get())) { stockParams->ProductCount = 100; stockParams->Quantity = 1000; stockParams->OrderCount = 100; stockParams->Limit = 10; stockParams->MinPartitions = 40; stockParams->PartitionsByLoad = true; - params = std::move(stockParams); - } else if (workloadType == NYdbWorkload::EWorkload::KV) { - params = std::make_unique(); - } else { - UNIT_ASSERT(false); } - UNIT_ASSERT(params); params->DbPath = "/Root"; - auto workloadQueryGen = NYdbWorkload::TWorkloadFactory().GetWorkloadQueryGenerator(workloadType, params.get()); + auto workloadQueryGen = params->CreateGenerator(); auto result = session.ExecuteSchemeQuery(workloadQueryGen->GetDDLQueries()).GetValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); @@ -62,17 +59,17 @@ void Test(NYdbWorkload::EWorkload workloadType) { UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); } int maxType = 0; - if (workloadType == NYdbWorkload::EWorkload::STOCK) { - maxType = static_cast(NYdbWorkload::TStockWorkloadGenerator::EType::MaxType); - } else if (workloadType == NYdbWorkload::EWorkload::KV) { - maxType = static_cast(NYdbWorkload::TKvWorkloadGenerator::EType::MaxType); + if (workloadType == "stock") { + maxType = GetEnumItemsCount(); + } else if (workloadType == "kv") { + maxType = GetEnumItemsCount(); } for (int type = 0; type < maxType; ++type) { size_t InFlight = 10; NPar::LocalExecutor().RunAdditionalThreads(InFlight); NPar::LocalExecutor().ExecRange([&db, type, ¶ms, workloadType](int /*id*/) { TTimer t; - auto workloadQueryGen = NYdbWorkload::TWorkloadFactory().GetWorkloadQueryGenerator(workloadType, params.get()); + auto workloadQueryGen = params->CreateGenerator(); auto session = db.CreateSession().GetValueSync().GetSession(); for (size_t i = 0; i < REPEATS; ++i) { auto queriesList = workloadQueryGen->GetWorkload(type); @@ -88,11 +85,11 @@ void Test(NYdbWorkload::EWorkload workloadType) { Y_UNIT_TEST_SUITE(KqpWorkload) { Y_UNIT_TEST(STOCK) { - Test(NYdbWorkload::EWorkload::STOCK); + Test("stock"); } Y_UNIT_TEST(KV) { - Test(NYdbWorkload::EWorkload::KV); + Test("kv"); } } } diff --git a/ydb/core/load_test/kqp.cpp b/ydb/core/load_test/kqp.cpp index 896fbe1d8bfa..d7f9be86bd06 100644 --- a/ydb/core/load_test/kqp.cpp +++ b/ydb/core/load_test/kqp.cpp @@ -254,30 +254,30 @@ class TKqpLoadActor : public TActorBootstrapped { IncreaseSessions = cmd.GetIncreaseSessions(); Total = std::make_unique(); - NYdbWorkload::TWorkloadFactory factory; if (cmd.Workload_case() == NKikimr::TEvLoadTestRequest_TKqpLoad::WorkloadCase::kStock) { - WorkloadClass = NYdbWorkload::EWorkload::STOCK; - NYdbWorkload::TStockWorkloadParams params; - params.PartitionsByLoad = cmd.GetStock().GetPartitionsByLoad(); - params.OrderCount = cmd.GetStock().GetOrderCount(); - params.ProductCount = cmd.GetStock().GetProductCount(); - params.Quantity = cmd.GetStock().GetQuantity(); - params.Limit = cmd.GetStock().GetLimit(); - params.DbPath = WorkingDir; - params.MinPartitions = UniformPartitionsCount; - WorkloadQueryGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); + WorkloadClass = "stock"; + auto params = std::make_shared(); + params->PartitionsByLoad = cmd.GetStock().GetPartitionsByLoad(); + params->OrderCount = cmd.GetStock().GetOrderCount(); + params->ProductCount = cmd.GetStock().GetProductCount(); + params->Quantity = cmd.GetStock().GetQuantity(); + params->Limit = cmd.GetStock().GetLimit(); + params->DbPath = WorkingDir; + params->MinPartitions = UniformPartitionsCount; + WorkloadQueryGen = std::make_shared(params.get()); + WorkloadQueryGenParams = params; } else if (cmd.Workload_case() == NKikimr::TEvLoadTestRequest_TKqpLoad::WorkloadCase::kKv) { - WorkloadClass = NYdbWorkload::EWorkload::KV; - NYdbWorkload::TKvWorkloadParams params; - params.InitRowCount = cmd.GetKv().GetInitRowCount(); - params.PartitionsByLoad = cmd.GetKv().GetPartitionsByLoad(); - params.MaxFirstKey = cmd.GetKv().GetMaxFirstKey(); - params.StringLen = cmd.GetKv().GetStringLen(); - params.ColumnsCnt = cmd.GetKv().GetColumnsCnt(); - params.RowsCnt = cmd.GetKv().GetRowsCnt(); - params.MinPartitions = UniformPartitionsCount; - params.DbPath = WorkingDir; - WorkloadQueryGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); + WorkloadClass = "kv"; + auto params = std::make_shared(); + params->InitRowCount = cmd.GetKv().GetInitRowCount(); + params->PartitionsByLoad = cmd.GetKv().GetPartitionsByLoad(); + params->MaxFirstKey = cmd.GetKv().GetMaxFirstKey(); + params->StringLen = cmd.GetKv().GetStringLen(); + params->ColumnsCnt = cmd.GetKv().GetColumnsCnt(); + params->RowsCnt = cmd.GetKv().GetRowsCnt(); + params->MinPartitions = UniformPartitionsCount; + WorkloadQueryGen = std::make_shared(params.get()); + WorkloadQueryGenParams = params; } else { return; } @@ -301,8 +301,8 @@ class TKqpLoadActor : public TActorBootstrapped { Become(&TKqpLoadActor::StateStart); - if (WorkloadClass == NYdbWorkload::EWorkload::STOCK) { - NYdbWorkload::TStockWorkloadParams* params = static_cast(WorkloadQueryGen->GetParams()); + if (WorkloadClass == "stock") { + NYdbWorkload::TStockWorkloadParams* params = static_cast(WorkloadQueryGenParams.get()); LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Starting load actor with workload STOCK, Params: {" << "PartitionsByLoad: " << params->PartitionsByLoad << " " << "OrderCount: " << params->OrderCount << " " @@ -311,8 +311,8 @@ class TKqpLoadActor : public TActorBootstrapped { << "Limit: " << params->Limit << " " << "DbPath: " << params->DbPath << " " << "MinPartitions: " << params->MinPartitions); - } else if (WorkloadClass == NYdbWorkload::EWorkload::KV) { - NYdbWorkload::TKvWorkloadParams* params = static_cast(WorkloadQueryGen->GetParams()); + } else if (WorkloadClass == "kv") { + NYdbWorkload::TKvWorkloadParams* params = static_cast(WorkloadQueryGenParams.get()); LOG_INFO_S(ctx, NKikimrServices::KQP_LOAD_TEST, "Tag# " << Tag << " Starting load actor with workload KV, Params: {" << "InitRowCount: " << params->InitRowCount << " " << "PartitionsByLoad: " << params->PartitionsByLoad << " " @@ -666,7 +666,7 @@ class TKqpLoadActor : public TActorBootstrapped { size_t NumOfSessions = 0; bool IncreaseSessions = false; size_t ResultsReceived = 0; - NYdbWorkload::EWorkload WorkloadClass; + TString WorkloadClass; NKikimrKqp::EQueryType QueryType; NYdbWorkload::TQueryInfoList InitData; @@ -674,6 +674,7 @@ class TKqpLoadActor : public TActorBootstrapped { const TActorId Parent; ui64 Tag; ui32 DurationSeconds; + std::shared_ptr WorkloadQueryGenParams; std::shared_ptr WorkloadQueryGen; // Monitoring diff --git a/ydb/library/workload/kv_workload.cpp b/ydb/library/workload/kv_workload.cpp index 95cb2f3541e7..c98857470fac 100644 --- a/ydb/library/workload/kv_workload.cpp +++ b/ydb/library/workload/kv_workload.cpp @@ -1,7 +1,7 @@ #include "kv_workload.h" -#include "format" -#include "util/random/random.h" - +#include "workload_factory.h" +#include +#include #include #include @@ -13,15 +13,12 @@ #include #include #include - -template <> -void Out(IOutputStream& out, NYdbWorkload::KvWorkloadConstants constant) -{ - out << static_cast(constant); -} +#include namespace NYdbWorkload { +TWorkloadFactory::TRegistrator KvRegistrar("kv"); + using TRow = TKvWorkloadGenerator::TRow; // Note: there is no mechanism to update row values for now so all keys should be different @@ -129,7 +126,7 @@ void VerifyRows(const TRow& checkRow, const TVector& readRows, TString mes TKvWorkloadGenerator::TKvWorkloadGenerator(const TKvWorkloadParams* params) - : Params(*params) + : TBase(params) , BigString(NKikimr::GenDataForLZ4(Params.StringLen)) { if (Params.MixedChangePartitionsSize) { @@ -142,10 +139,6 @@ TKvWorkloadGenerator::TKvWorkloadGenerator(const TKvWorkloadParams* params) Y_ABORT_UNLESS(Params.KeyColumnsCnt <= Params.ColumnsCnt); } -TKvWorkloadParams* TKvWorkloadGenerator::GetParams() { - return &Params; -} - std::string TKvWorkloadGenerator::GetDDLQueries() const { std::stringstream ss; @@ -198,6 +191,16 @@ TQueryInfoList TKvWorkloadGenerator::GetWorkload(int type) { } +TVector TKvWorkloadGenerator::GetSupportedWorkloadTypes() const { + TVector result; + result.emplace_back(static_cast(EType::UpsertRandom), "upsert", "Upsert random rows into table"); + result.emplace_back(static_cast(EType::InsertRandom), "insert", "Insert random rows into table"); + result.emplace_back(static_cast(EType::SelectRandom), "select", "Select rows matching primary key(s)"); + result.emplace_back(static_cast(EType::ReadRowsRandom), "read-rows", "ReadRows rows matching primary key(s)"); + result.emplace_back(static_cast(EType::Mixed), "mixed", "Writes and SELECT/ReadsRows rows randomly, verifies them"); + return result; +} + TQueryInfoList TKvWorkloadGenerator::WriteRows(TString operation, TVector&& rows) { std::stringstream ss; @@ -449,10 +452,8 @@ TQueryInfoList TKvWorkloadGenerator::GetInitialData() { return res; } -std::string TKvWorkloadGenerator::GetCleanDDLQueries() const { - std::string query = "DROP TABLE `" + Params.TableName + "`;"; - - return query; +TVector TKvWorkloadGenerator::GetCleanPaths() const { + return { Params.TableName }; } TVector TKvWorkloadGenerator::GenerateRandomRows(bool randomValues) { @@ -486,5 +487,86 @@ TVector TKvWorkloadGenerator::GenerateRandomRows(bool randomValues) { return result; } +void TKvWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) { + opts.SetFreeArgsNum(0); + switch (commandType) { + case TWorkloadParams::ECommandType::Init: + opts.AddLongOption("init-upserts", "count of upserts need to create while table initialization") + .DefaultValue((ui64)KvWorkloadConstants::INIT_ROW_COUNT).StoreResult(&InitRowCount); + opts.AddLongOption("min-partitions", "Minimum partitions for tables.") + .DefaultValue((ui64)KvWorkloadConstants::MIN_PARTITIONS).StoreResult(&MinPartitions); + opts.AddLongOption("partition-size", "Maximum partition size in megabytes (AUTO_PARTITIONING_PARTITION_SIZE_MB).") + .DefaultValue((ui64)KvWorkloadConstants::PARTITION_SIZE_MB).StoreResult(&PartitionSizeMb); + opts.AddLongOption("auto-partition", "Enable auto partitioning by load.") + .DefaultValue((ui64)KvWorkloadConstants::PARTITIONS_BY_LOAD).StoreResult(&PartitionsByLoad); + opts.AddLongOption("max-first-key", "Maximum value of a first primary key") + .DefaultValue((ui64)KvWorkloadConstants::MAX_FIRST_KEY).StoreResult(&MaxFirstKey); + opts.AddLongOption("len", "String len") + .DefaultValue((ui64)KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen); + opts.AddLongOption("cols", "Number of columns") + .DefaultValue((ui64)KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt); + opts.AddLongOption("int-cols", "Number of int columns") + .DefaultValue((ui64)KvWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); + opts.AddLongOption("key-cols", "Number of key columns") + .DefaultValue((ui64)KvWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); + opts.AddLongOption("rows", "Number of rows") + .DefaultValue((ui64)KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); + break; + case TWorkloadParams::ECommandType::Run: + opts.AddLongOption("max-first-key", "Maximum value of a first primary key") + .DefaultValue((ui64)KvWorkloadConstants::MAX_FIRST_KEY).StoreResult(&MaxFirstKey); + opts.AddLongOption("int-cols", "Number of int columns") + .DefaultValue((ui64)KvWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); + opts.AddLongOption("key-cols", "Number of key columns") + .DefaultValue((ui64)KvWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); + switch (static_cast(workloadType)) { + case TKvWorkloadGenerator::EType::UpsertRandom: + opts.AddLongOption("len", "String len") + .DefaultValue((ui64)KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen); + opts.AddLongOption("cols", "Number of columns to upsert") + .DefaultValue((ui64)KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt); + opts.AddLongOption("rows", "Number of rows to upsert") + .DefaultValue((ui64)NYdbWorkload::KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); + break; + case TKvWorkloadGenerator::EType::InsertRandom: + opts.AddLongOption("len", "String len") + .DefaultValue((ui64)KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen); + opts.AddLongOption("cols", "Number of columns to insert") + .DefaultValue((ui64)KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt); + opts.AddLongOption("rows", "Number of rows to insert") + .DefaultValue((ui64)NYdbWorkload::KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); + break; + case TKvWorkloadGenerator::EType::SelectRandom: + case TKvWorkloadGenerator::EType::ReadRowsRandom: + opts.AddLongOption("cols", "Number of columns to select for a single query") + .DefaultValue((ui64)KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt); + opts.AddLongOption("rows", "Number of rows to select for a single query") + .DefaultValue((ui64)NYdbWorkload::KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); + break; + case TKvWorkloadGenerator::EType::Mixed: + opts.AddLongOption("len", "String len") + .DefaultValue((ui64)KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen); + opts.AddLongOption("cols", "Number of columns") + .DefaultValue((ui64)KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt); + opts.AddLongOption("change-partitions-size", "Apply random changes of AUTO_PARTITIONING_PARTITION_SIZE_MB setting") + .DefaultValue((ui64)KvWorkloadConstants::MIXED_CHANGE_PARTITIONS_SIZE).StoreResult(&MixedChangePartitionsSize); + opts.AddLongOption("do-select", "Do SELECT operations") + .DefaultValue((ui64)KvWorkloadConstants::MIXED_DO_SELECT).StoreResult(&MixedDoSelect); + opts.AddLongOption("do-read-rows", "Do ReadRows operations") + .DefaultValue((ui64)KvWorkloadConstants::MIXED_DO_READ_ROWS).StoreResult(&MixedDoReadRows); + } + break; + case TWorkloadParams::ECommandType::Clean: + break; + } +} -} \ No newline at end of file +THolder TKvWorkloadParams::CreateGenerator() const { + return MakeHolder(this); +} + +TString TKvWorkloadParams::GetWorkloadName() const { + return "Key-Value"; +} + +} diff --git a/ydb/library/workload/kv_workload.h b/ydb/library/workload/kv_workload.h index 5ea7ca4fe43c..7cf2791cbefa 100644 --- a/ydb/library/workload/kv_workload.h +++ b/ydb/library/workload/kv_workload.h @@ -26,7 +26,11 @@ enum KvWorkloadConstants : ui64 { MIXED_DO_SELECT = true, }; -struct TKvWorkloadParams : public TWorkloadParams { +class TKvWorkloadParams : public TWorkloadParams { +public: + void ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) override; + THolder CreateGenerator() const override; + TString GetWorkloadName() const override; ui64 MinPartitions = KvWorkloadConstants::MIN_PARTITIONS; const ui64 MaxPartitions = KvWorkloadConstants::MAX_PARTITIONS; ui64 PartitionSizeMb = KvWorkloadConstants::PARTITION_SIZE_MB; @@ -46,8 +50,9 @@ struct TKvWorkloadParams : public TWorkloadParams { const std::string TableName = "kv_test"; }; -class TKvWorkloadGenerator : public IWorkloadQueryGenerator { +class TKvWorkloadGenerator final: public TWorkloadQueryGeneratorBase { public: + using TBase = TWorkloadQueryGeneratorBase; struct TRow { TVector Ints; TVector Strings; @@ -69,30 +74,23 @@ class TKvWorkloadGenerator : public IWorkloadQueryGenerator { return Ints == other.Ints && Strings == other.Strings; } }; - - static TKvWorkloadGenerator* New(const TKvWorkloadParams* params) { - return new TKvWorkloadGenerator(params); - } - - virtual ~TKvWorkloadGenerator() {} + TKvWorkloadGenerator(const TKvWorkloadParams* params); std::string GetDDLQueries() const override; TQueryInfoList GetInitialData() override; - std::string GetCleanDDLQueries() const override; + TVector GetCleanPaths() const override; TQueryInfoList GetWorkload(int type) override; - - TKvWorkloadParams* GetParams() override; + TVector GetSupportedWorkloadTypes() const override; enum class EType { UpsertRandom, InsertRandom, SelectRandom, ReadRowsRandom, - Mixed, - MaxType + Mixed }; private: @@ -103,12 +101,9 @@ class TKvWorkloadGenerator : public IWorkloadQueryGenerator { TQueryInfoList ReadRows(TVector&& rows); TQueryInfoList Mixed(); - TKvWorkloadGenerator(const TKvWorkloadParams* params); - TQueryInfo FillKvData() const; TVector GenerateRandomRows(bool randomValues = false); - TKvWorkloadParams Params; TString BigString; std::atomic MixedNextChangePartitionsSize; diff --git a/ydb/library/workload/stock_workload.cpp b/ydb/library/workload/stock_workload.cpp index 05497ba7dcf6..e3e257015c19 100644 --- a/ydb/library/workload/stock_workload.cpp +++ b/ydb/library/workload/stock_workload.cpp @@ -1,6 +1,8 @@ #include "stock_workload.h" +#include "workload_factory.h" #include +#include #include #include @@ -20,9 +22,11 @@ uint64_t getOrderId() { namespace NYdbWorkload { +TWorkloadFactory::TRegistrator StockRegistrar("stock"); + TStockWorkloadGenerator::TStockWorkloadGenerator(const TStockWorkloadParams* params) - : DbPath(params->DbPath) - , Params(*params) + : TBase(params) + , DbPath(params->DbPath) , Rd() , Gen(Rd()) , RandExpDistrib(1.6) @@ -32,10 +36,6 @@ TStockWorkloadGenerator::TStockWorkloadGenerator(const TStockWorkloadParams* par Gen.seed(Now().MicroSeconds()); } -TStockWorkloadParams* TStockWorkloadGenerator::GetParams() { - return &Params; -} - std::string TStockWorkloadGenerator::GetDDLQueries() const { std::string stockPartitionsDdl = ""; std::string ordersPartitionsDdl = "WITH (READ_REPLICAS_SETTINGS = \"per_az:1\")"; @@ -86,14 +86,8 @@ TQueryInfoList TStockWorkloadGenerator::GetInitialData() { return res; } -std::string TStockWorkloadGenerator::GetCleanDDLQueries() const { - std::string clean_query = R"( - DROP TABLE `stock`; - DROP TABLE `orders`; - DROP TABLE `orderLines`; - )"; - - return clean_query; +TVector TStockWorkloadGenerator::GetCleanPaths() const { + return {"stock", "orders", "orderLines"}; } TQueryInfo TStockWorkloadGenerator::FillStockData() const { @@ -138,6 +132,17 @@ TQueryInfoList TStockWorkloadGenerator::GetWorkload(int type) { } } + +TVector TStockWorkloadGenerator::GetSupportedWorkloadTypes() const { + TVector result; + result.emplace_back(static_cast(EType::InsertRandomOrder), "add-rand-order", "Inserts orders with random ID without their processing"); + result.emplace_back(static_cast(EType::SubmitRandomOrder), "put-rand-order", "Submit random orders with processing"); + result.emplace_back(static_cast(EType::SubmitSameOrder), "put-same-order", "Submit orders with same products with processing"); + result.emplace_back(static_cast(EType::GetRandomCustomerHistory), "rand-user-hist", "Selects orders of random customer"); + result.emplace_back(static_cast(EType::GetCustomerHistory), "user-hist", "Selects orders of 10000th customer"); + return result; +} + TQueryInfo TStockWorkloadGenerator::InsertOrder(const uint64_t orderID, const std::string& customer, const TProductsQuantity& products) { std::string query = R"(--!syntax_v1 DECLARE $ido AS UInt64; @@ -301,4 +306,49 @@ TQueryInfoList TStockWorkloadGenerator::GetCustomerHistory() { return res; } +void TStockWorkloadParams::ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) { + opts.SetFreeArgsNum(0); + switch (commandType) { + case TWorkloadParams::ECommandType::Init: + opts.AddLongOption('p', "products", "Product count. Value in 1..500 000.") + .DefaultValue(100).StoreResult(&ProductCount); + opts.AddLongOption('q', "quantity", "Quantity of each product in stock.") + .DefaultValue(1000).StoreResult(&Quantity); + opts.AddLongOption('o', "orders", "Initial orders count.") + .DefaultValue(100).StoreResult(&OrderCount); + opts.AddLongOption("min-partitions", "Minimum partitions for tables.") + .DefaultValue(40).StoreResult(&MinPartitions); + opts.AddLongOption("auto-partition", "Enable auto partitioning by load.") + .DefaultValue(true).StoreResult(&PartitionsByLoad); + opts.AddLongOption("enable-cdc", "Create changefeeds on tables.") + .DefaultValue(false).StoreTrue(&EnableCdc).Hidden(); + break; + case TWorkloadParams::ECommandType::Run: + switch (static_cast(workloadType)) { + case TStockWorkloadGenerator::EType::InsertRandomOrder: + case TStockWorkloadGenerator::EType::SubmitRandomOrder: + case TStockWorkloadGenerator::EType::SubmitSameOrder: + opts.AddLongOption('p', "products", "Products count to use in workload.") + .DefaultValue(100).StoreResult(&ProductCount); + break; + case TStockWorkloadGenerator::EType::GetRandomCustomerHistory: + case TStockWorkloadGenerator::EType::GetCustomerHistory: + opts.AddLongOption('l', "limit", "Number of last orders to select.") + .DefaultValue(10).StoreResult(&Limit); + break; + } + break; + case TWorkloadParams::ECommandType::Clean: + break; + } +} + +THolder TStockWorkloadParams::CreateGenerator() const { + return MakeHolder(this); +} + +TString TStockWorkloadParams::GetWorkloadName() const { + return "stock"; +} + } diff --git a/ydb/library/workload/stock_workload.h b/ydb/library/workload/stock_workload.h index bd662df35053..920424885394 100644 --- a/ydb/library/workload/stock_workload.h +++ b/ydb/library/workload/stock_workload.h @@ -8,7 +8,11 @@ namespace NYdbWorkload { -struct TStockWorkloadParams : public TWorkloadParams { +class TStockWorkloadParams final: public TWorkloadParams { +public: + void ConfigureOpts(NLastGetopt::TOpts& opts, const ECommandType commandType, int workloadType) override; + THolder CreateGenerator() const override; + TString GetWorkloadName() const override; size_t ProductCount = 0; size_t Quantity = 0; size_t OrderCount = 0; @@ -18,32 +22,25 @@ struct TStockWorkloadParams : public TWorkloadParams { bool EnableCdc = false; }; -class TStockWorkloadGenerator : public IWorkloadQueryGenerator { +class TStockWorkloadGenerator final: public TWorkloadQueryGeneratorBase { public: - - static TStockWorkloadGenerator* New(const TStockWorkloadParams* params) { - return new TStockWorkloadGenerator(params); - } - - virtual ~TStockWorkloadGenerator() {} + using TBase = TWorkloadQueryGeneratorBase; + TStockWorkloadGenerator(const TStockWorkloadParams* params); std::string GetDDLQueries() const override; TQueryInfoList GetInitialData() override; - std::string GetCleanDDLQueries() const override; + TVector GetCleanPaths() const override; TQueryInfoList GetWorkload(int type) override; - - TStockWorkloadParams* GetParams() override; - + TVector GetSupportedWorkloadTypes() const override; enum class EType { InsertRandomOrder, SubmitRandomOrder, SubmitSameOrder, GetRandomCustomerHistory, - GetCustomerHistory, - MaxType + GetCustomerHistory }; private: @@ -64,12 +61,9 @@ class TStockWorkloadGenerator : public IWorkloadQueryGenerator { unsigned int GetProductCountInOrder(); TProductsQuantity GenerateOrder(unsigned int productCountInOrder, int quantity); - TStockWorkloadGenerator(const TStockWorkloadParams* params); - TQueryInfo FillStockData() const; std::string DbPath; - TStockWorkloadParams Params; std::random_device Rd; std::mt19937_64 Gen; diff --git a/ydb/library/workload/workload_factory.cpp b/ydb/library/workload/workload_factory.cpp index 11038e697f3c..a23934554789 100644 --- a/ydb/library/workload/workload_factory.cpp +++ b/ydb/library/workload/workload_factory.cpp @@ -1,23 +1 @@ #include "workload_factory.h" - -#include "stock_workload.h" -#include "kv_workload.h" - -namespace NYdbWorkload { - - std::shared_ptr TWorkloadFactory::GetWorkloadQueryGenerator(const EWorkload& type , const TWorkloadParams* params) - { - if (!params) { - throw yexception() << "Params not specified"; - } - - if (type == EWorkload::STOCK) { - return std::shared_ptr(TStockWorkloadGenerator::New(static_cast(params))); - } else if (type == EWorkload::KV) { - return std::shared_ptr(TKvWorkloadGenerator::New(static_cast(params))); - } - - throw yexception() << "Unknown workload"; - } - -} \ No newline at end of file diff --git a/ydb/library/workload/workload_factory.h b/ydb/library/workload/workload_factory.h index a786fae2c4f9..895ed517a433 100644 --- a/ydb/library/workload/workload_factory.h +++ b/ydb/library/workload/workload_factory.h @@ -2,18 +2,10 @@ #include "workload_query_generator.h" -#include +#include namespace NYdbWorkload { -enum class EWorkload { - STOCK, - KV, -}; + using TWorkloadFactory = NObjectFactory::TObjectFactory; -class TWorkloadFactory { -public: - std::shared_ptr GetWorkloadQueryGenerator(const EWorkload& type, const TWorkloadParams* params); -}; - -} // namespace NYdbWorkload \ No newline at end of file +} // namespace NYdbWorkload diff --git a/ydb/library/workload/workload_query_generator.h b/ydb/library/workload/workload_query_generator.h index bef3bd2130f7..d4b7691404b6 100644 --- a/ydb/library/workload/workload_query_generator.h +++ b/ydb/library/workload/workload_query_generator.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -37,21 +38,78 @@ struct TQueryInfo { using TQueryInfoList = std::list; -struct TWorkloadParams { - std::string DbPath; +class IBulkDataGenerator { +public: + using TPtr = std::shared_ptr; + virtual ~IBulkDataGenerator() = default; + IBulkDataGenerator(const std::string& table) + : Table(table) + {} + + std::string GetTable() const { + return Table; + } + + virtual TMaybe GenerateDataPortion() = 0; + +private: + std::string Table; }; +using TBulkDataGeneratorList = std::list>; + class IWorkloadQueryGenerator { public: - virtual ~IWorkloadQueryGenerator() {} - + struct TWorkloadType { + int Type = 0; + TString CommandName; + TString Description; + }; +public: + virtual ~IWorkloadQueryGenerator() = default; virtual std::string GetDDLQueries() const = 0; virtual TQueryInfoList GetInitialData() = 0; - virtual std::string GetCleanDDLQueries() const = 0; - + virtual TBulkDataGeneratorList GetBulkInitialData() const { + return {}; + } + virtual TVector GetCleanPaths() const = 0; virtual TQueryInfoList GetWorkload(int type) = 0; + virtual TVector GetSupportedWorkloadTypes() const = 0; + std::string GetCleanDDLQueries() const { + std::string cleanQuery; + for (const auto& table : GetCleanPaths()) { + cleanQuery += "DROP TABLE `" + table + "`;"; + } + return cleanQuery; + }; +}; + +class TWorkloadParams { +public: + enum class ECommandType { + Init /* "init" */, + Run /* "run" */, + Clean /* "clean" */ + }; + virtual ~TWorkloadParams() = default; + virtual void ConfigureOpts(NLastGetopt::TOpts& /*opts*/, const ECommandType /*commandType*/, int /*workloadType*/) { + }; + virtual THolder CreateGenerator() const = 0; + virtual TString GetWorkloadName() const = 0; - virtual TWorkloadParams* GetParams() = 0; +public: + std::string DbPath; +}; + +template +class TWorkloadQueryGeneratorBase: public IWorkloadQueryGenerator { +public: + using TParams = TP; + TWorkloadQueryGeneratorBase(const TParams* params) + : Params(*params) + {} +protected: + const TParams& Params; }; } // namespace NYdbWorkload diff --git a/ydb/library/workload/ya.make b/ydb/library/workload/ya.make index 1e593d3d9766..880d56d6d918 100644 --- a/ydb/library/workload/ya.make +++ b/ydb/library/workload/ya.make @@ -1,14 +1,18 @@ LIBRARY() SRCS( - stock_workload.cpp - kv_workload.cpp + GLOBAL stock_workload.cpp + GLOBAL kv_workload.cpp workload_factory.cpp ) PEERDIR( + library/cpp/getopt ydb/public/api/protos ydb/public/sdk/cpp/client/ydb_table ) +GENERATE_ENUM_SERIALIZATION_WITH_HEADER(kv_workload.h) +GENERATE_ENUM_SERIALIZATION_WITH_HEADER(stock_workload.h) + END() diff --git a/ydb/public/lib/ydb_cli/commands/kv_workload.cpp b/ydb/public/lib/ydb_cli/commands/kv_workload.cpp deleted file mode 100644 index fa8a01277153..000000000000 --- a/ydb/public/lib/ydb_cli/commands/kv_workload.cpp +++ /dev/null @@ -1,338 +0,0 @@ -#include "kv_workload.h" - -#include -#include -#include - -namespace NYdb::NConsoleClient { - -TCommandKv::TCommandKv() - : TClientCommandTree("kv", {}, "YDB Key-Value workload") -{ - AddCommand(std::make_unique()); - AddCommand(std::make_unique()); - AddCommand(std::make_unique()); -} - -TCommandKvInit::TCommandKvInit() - : TWorkloadCommand("init", {}, "Create and initialize a table for workload") - , InitRowCount(NYdbWorkload::KvWorkloadConstants::INIT_ROW_COUNT) - , MinPartitions(NYdbWorkload::KvWorkloadConstants::MIN_PARTITIONS) - , MaxFirstKey(NYdbWorkload::KvWorkloadConstants::MAX_FIRST_KEY) - , StringLen(NYdbWorkload::KvWorkloadConstants::STRING_LEN) - , ColumnsCnt(NYdbWorkload::KvWorkloadConstants::COLUMNS_CNT) - , IntColumnsCnt(NYdbWorkload::KvWorkloadConstants::INT_COLUMNS_CNT) - , KeyColumnsCnt(NYdbWorkload::KvWorkloadConstants::KEY_COLUMNS_CNT) - , RowsCnt(NYdbWorkload::KvWorkloadConstants::ROWS_CNT) - , PartitionsByLoad(NYdbWorkload::KvWorkloadConstants::PARTITIONS_BY_LOAD) -{} - -void TCommandKvInit::Config(TConfig& config) { - TYdbCommand::Config(config); - - config.SetFreeArgsNum(0); - - config.Opts->AddLongOption("init-upserts", "count of upserts need to create while table initialization") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::INIT_ROW_COUNT).StoreResult(&InitRowCount); - config.Opts->AddLongOption("min-partitions", "Minimum partitions for tables.") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::MIN_PARTITIONS).StoreResult(&MinPartitions); - config.Opts->AddLongOption("partition-size", "Maximum partition size in megabytes (AUTO_PARTITIONING_PARTITION_SIZE_MB).") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::PARTITION_SIZE_MB).StoreResult(&PartitionSize); - config.Opts->AddLongOption("auto-partition", "Enable auto partitioning by load.") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::PARTITIONS_BY_LOAD).StoreResult(&PartitionsByLoad); - config.Opts->AddLongOption("max-first-key", "Maximum value of a first primary key") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::MAX_FIRST_KEY).StoreResult(&MaxFirstKey); - config.Opts->AddLongOption("len", "String len") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen); - config.Opts->AddLongOption("cols", "Number of columns") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt); - config.Opts->AddLongOption("int-cols", "Number of int columns") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); - config.Opts->AddLongOption("key-cols", "Number of key columns") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); - config.Opts->AddLongOption("rows", "Number of rows") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); -} - -void TCommandKvInit::Parse(TConfig& config) { - TClientCommand::Parse(config); -} - -int TCommandKvInit::Run(TConfig& config) { - Driver = std::make_unique(CreateDriver(config)); - TableClient = std::make_unique(*Driver); - NYdbWorkload::TKvWorkloadParams params; - params.DbPath = config.Database; - params.InitRowCount = InitRowCount; - params.MinPartitions = MinPartitions; - params.PartitionSizeMb = PartitionSize; - params.PartitionsByLoad = PartitionsByLoad; - params.MaxFirstKey = MaxFirstKey; - params.StringLen = StringLen; - params.ColumnsCnt = ColumnsCnt; - params.IntColumnsCnt = IntColumnsCnt; - params.KeyColumnsCnt = KeyColumnsCnt; - params.RowsCnt = RowsCnt; - - NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); - - return InitTables(workloadGen); -} - - -TCommandKvClean::TCommandKvClean() - : TWorkloadCommand("clean", {}, "Drop table created in init phase") {} - -void TCommandKvClean::Config(TConfig& config) { - TWorkloadCommand::Config(config); - config.SetFreeArgsNum(0); -} - -void TCommandKvClean::Parse(TConfig& config) { - TClientCommand::Parse(config); -} - -int TCommandKvClean::Run(TConfig& config) { - Driver = std::make_unique(CreateDriver(config)); - TableClient = std::make_unique(*Driver); - NYdbWorkload::TKvWorkloadParams params; - params.DbPath = config.Database; - - NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); - - return CleanTables(workloadGen); -} - -TCommandKvRun::TCommandKvRun() - : TClientCommandTree("run", {}, "Run YDB Key-Value workload") -{ - AddCommand(std::make_unique()); - AddCommand(std::make_unique()); - AddCommand(std::make_unique()); - AddCommand(std::make_unique()); - AddCommand(std::make_unique()); -} - -TCommandKvRunUpsertRandom::TCommandKvRunUpsertRandom() - : TWorkloadCommand("upsert", {}, "Upsert random rows into table") -{} - -void TCommandKvRunUpsertRandom::Config(TConfig& config) { - TWorkloadCommand::Config(config); - config.SetFreeArgsNum(0); - - config.Opts->AddLongOption("max-first-key", "Maximum value of a first primary key") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::MAX_FIRST_KEY).StoreResult(&MaxFirstKey); - config.Opts->AddLongOption("len", "String len") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen); - config.Opts->AddLongOption("cols", "Number of columns to upsert") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt); - config.Opts->AddLongOption("int-cols", "Number of int columns") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); - config.Opts->AddLongOption("key-cols", "Number of key columns") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); - config.Opts->AddLongOption("rows", "Number of rows to upsert") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); -} - -void TCommandKvRunUpsertRandom::Parse(TConfig& config) { - TClientCommand::Parse(config); -} - -int TCommandKvRunUpsertRandom::Run(TConfig& config) { - PrepareForRun(config); - - NYdbWorkload::TKvWorkloadParams params; - params.DbPath = config.Database; - params.MaxFirstKey = MaxFirstKey; - params.StringLen = StringLen; - params.ColumnsCnt = ColumnsCnt; - params.IntColumnsCnt = IntColumnsCnt; - params.KeyColumnsCnt = KeyColumnsCnt; - params.RowsCnt = RowsCnt; - - NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); - - return RunWorkload(workloadGen, static_cast(NYdbWorkload::TKvWorkloadGenerator::EType::UpsertRandom)); -} - -TCommandKvRunInsertRandom::TCommandKvRunInsertRandom() - : TWorkloadCommand("insert", {}, "Insert random rows into table") -{} - -void TCommandKvRunInsertRandom::Config(TConfig& config) { - TWorkloadCommand::Config(config); - config.SetFreeArgsNum(0); - - config.Opts->AddLongOption("max-first-key", "Maximum value of a first primary key") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::MAX_FIRST_KEY).StoreResult(&MaxFirstKey); - config.Opts->AddLongOption("len", "String len") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen); - config.Opts->AddLongOption("cols", "Number of columns insert") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt); - config.Opts->AddLongOption("int-cols", "Number of int columns") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); - config.Opts->AddLongOption("key-cols", "Number of key columns") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); - config.Opts->AddLongOption("rows", "Number of rows to insert") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); -} - -void TCommandKvRunInsertRandom::Parse(TConfig& config) { - TClientCommand::Parse(config); -} - -int TCommandKvRunInsertRandom::Run(TConfig& config) { - PrepareForRun(config); - - NYdbWorkload::TKvWorkloadParams params; - params.DbPath = config.Database; - params.MaxFirstKey = MaxFirstKey; - params.StringLen = StringLen; - params.ColumnsCnt = ColumnsCnt; - params.IntColumnsCnt = IntColumnsCnt; - params.KeyColumnsCnt = KeyColumnsCnt; - params.RowsCnt = RowsCnt; - - NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); - - return RunWorkload(workloadGen, static_cast(NYdbWorkload::TKvWorkloadGenerator::EType::InsertRandom)); -} - -TCommandKvRunSelectRandom::TCommandKvRunSelectRandom() - : TWorkloadCommand("select", {}, "Select rows matching primary key(s)") -{} - -void TCommandKvRunSelectRandom::Config(TConfig& config) { - TWorkloadCommand::Config(config); - config.SetFreeArgsNum(0); - - config.Opts->AddLongOption("max-first-key", "Maximum value of a first primary key") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::MAX_FIRST_KEY).StoreResult(&MaxFirstKey); - config.Opts->AddLongOption("cols", "Number of columns to select for a single query") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt); - config.Opts->AddLongOption("int-cols", "Number of int columns") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); - config.Opts->AddLongOption("key-cols", "Number of key columns") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); - config.Opts->AddLongOption("rows", "Number of rows to select for a single query") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); -} - -void TCommandKvRunSelectRandom::Parse(TConfig& config) { - TClientCommand::Parse(config); -} - -int TCommandKvRunSelectRandom::Run(TConfig& config) { - PrepareForRun(config); - - NYdbWorkload::TKvWorkloadParams params; - params.DbPath = config.Database; - params.MaxFirstKey = MaxFirstKey; - params.ColumnsCnt = ColumnsCnt; - params.IntColumnsCnt = IntColumnsCnt; - params.KeyColumnsCnt = KeyColumnsCnt; - params.RowsCnt = RowsCnt; - - NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); - - return RunWorkload(workloadGen, static_cast(NYdbWorkload::TKvWorkloadGenerator::EType::SelectRandom)); -} - -TCommandKvRunReadRowsRandom::TCommandKvRunReadRowsRandom() - : TWorkloadCommand("read-rows", {}, "ReadRows rows matching primary key(s)") -{} - -void TCommandKvRunReadRowsRandom::Config(TConfig& config) { - TWorkloadCommand::Config(config); - config.SetFreeArgsNum(0); - - config.Opts->AddLongOption("max-first-key", "Maximum value of a first primary key") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::MAX_FIRST_KEY).StoreResult(&MaxFirstKey); - config.Opts->AddLongOption("cols", "Number of columns to select for a single query") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt); - config.Opts->AddLongOption("int-cols", "Number of int columns") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); - config.Opts->AddLongOption("key-cols", "Number of key columns") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); - config.Opts->AddLongOption("rows", "Number of rows to select for a single query") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::ROWS_CNT).StoreResult(&RowsCnt); -} - -void TCommandKvRunReadRowsRandom::Parse(TConfig& config) { - TClientCommand::Parse(config); -} - -int TCommandKvRunReadRowsRandom::Run(TConfig& config) { - PrepareForRun(config); - - NYdbWorkload::TKvWorkloadParams params; - params.DbPath = config.Database; - params.MaxFirstKey = MaxFirstKey; - params.ColumnsCnt = ColumnsCnt; - params.IntColumnsCnt = IntColumnsCnt; - params.KeyColumnsCnt = KeyColumnsCnt; - params.RowsCnt = RowsCnt; - - NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); - - return RunWorkload(workloadGen, static_cast(NYdbWorkload::TKvWorkloadGenerator::EType::ReadRowsRandom)); -} - -TCommandKvRunMixed::TCommandKvRunMixed() - : TWorkloadCommand("mixed", {}, "Writes and SELECT/ReadsRows rows randomly, verifies them") -{} - -void TCommandKvRunMixed::Config(TConfig& config) { - TWorkloadCommand::Config(config); - config.SetFreeArgsNum(0); - - config.Opts->AddLongOption("max-first-key", "Maximum value of a first primary key") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::MAX_FIRST_KEY).StoreResult(&MaxFirstKey); - config.Opts->AddLongOption("len", "String len") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::STRING_LEN).StoreResult(&StringLen); - config.Opts->AddLongOption("cols", "Number of columns") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::COLUMNS_CNT).StoreResult(&ColumnsCnt); - config.Opts->AddLongOption("int-cols", "Number of int columns") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::INT_COLUMNS_CNT).StoreResult(&IntColumnsCnt); - config.Opts->AddLongOption("key-cols", "Number of key columns") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::KEY_COLUMNS_CNT).StoreResult(&KeyColumnsCnt); - config.Opts->AddLongOption("change-partitions-size", "Apply random changes of AUTO_PARTITIONING_PARTITION_SIZE_MB setting") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::MIXED_CHANGE_PARTITIONS_SIZE).StoreResult(&ChangePartitionsSize); - config.Opts->AddLongOption("do-select", "Do SELECT operations") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::MIXED_DO_SELECT).StoreResult(&DoSelect); - config.Opts->AddLongOption("do-read-rows", "Do ReadRows operations") - .DefaultValue(NYdbWorkload::KvWorkloadConstants::MIXED_DO_READ_ROWS).StoreResult(&DoReadRows); -} - -void TCommandKvRunMixed::Parse(TConfig& config) { - TClientCommand::Parse(config); -} - -int TCommandKvRunMixed::Run(TConfig& config) { - PrepareForRun(config); - - NYdbWorkload::TKvWorkloadParams params; - params.DbPath = config.Database; - params.MaxFirstKey = MaxFirstKey; - params.StringLen = StringLen; - params.ColumnsCnt = ColumnsCnt; - params.IntColumnsCnt = IntColumnsCnt; - params.KeyColumnsCnt = KeyColumnsCnt; - params.MixedChangePartitionsSize = ChangePartitionsSize; - params.MixedDoReadRows = DoReadRows; - params.MixedDoSelect = DoSelect; - - NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::KV, ¶ms); - - return RunWorkload(workloadGen, static_cast(NYdbWorkload::TKvWorkloadGenerator::EType::Mixed)); -} - -} // namespace NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/commands/kv_workload.h b/ydb/public/lib/ydb_cli/commands/kv_workload.h deleted file mode 100644 index c1eaa49f72dd..000000000000 --- a/ydb/public/lib/ydb_cli/commands/kv_workload.h +++ /dev/null @@ -1,132 +0,0 @@ -#pragma once - -#include "ydb/public/lib/ydb_cli/commands/ydb_workload.h" - -namespace NYdb { -namespace NConsoleClient { - -class TCommandKv : public TClientCommandTree { -public: - TCommandKv(); -}; - -class TCommandKvInit : public TWorkloadCommand { -public: - TCommandKvInit(); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; - -private: - ui64 InitRowCount; - ui64 MinPartitions; - ui64 PartitionSize; - ui64 MaxFirstKey; - ui64 StringLen; - ui64 ColumnsCnt; - ui64 IntColumnsCnt; - ui64 KeyColumnsCnt; - ui64 RowsCnt; - bool PartitionsByLoad; -}; - -class TCommandKvClean : public TWorkloadCommand { -public: - TCommandKvClean(); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; -}; - -class TCommandKvRun : public TClientCommandTree { -public: - TCommandKvRun(); -}; - -class TCommandKvRunUpsertRandom : public TWorkloadCommand { -public: - TCommandKvRunUpsertRandom(); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; - -private: - ui64 MaxFirstKey; - ui64 StringLen; - ui64 ColumnsCnt; - ui64 IntColumnsCnt; - ui64 KeyColumnsCnt; - ui64 RowsCnt; - -}; - -class TCommandKvRunInsertRandom : public TWorkloadCommand { -public: - TCommandKvRunInsertRandom(); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; - -private: - ui64 MaxFirstKey; - ui64 StringLen; - ui64 ColumnsCnt; - ui64 IntColumnsCnt; - ui64 KeyColumnsCnt; - ui64 RowsCnt; - -}; - -class TCommandKvRunSelectRandom : public TWorkloadCommand { -public: - TCommandKvRunSelectRandom(); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; - -private: - ui64 MaxFirstKey; - ui64 ColumnsCnt; - ui64 IntColumnsCnt; - ui64 KeyColumnsCnt; - ui64 RowsCnt; - -}; - -class TCommandKvRunReadRowsRandom : public TWorkloadCommand { -public: - TCommandKvRunReadRowsRandom(); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; - -private: - ui64 MaxFirstKey; - ui64 ColumnsCnt; - ui64 IntColumnsCnt; - ui64 KeyColumnsCnt; - ui64 RowsCnt; - -}; - -class TCommandKvRunMixed : public TWorkloadCommand { -public: - TCommandKvRunMixed(); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; - -private: - ui64 MaxFirstKey; - ui64 StringLen; - ui64 ColumnsCnt; - ui64 IntColumnsCnt; - ui64 KeyColumnsCnt; - bool ChangePartitionsSize; - bool DoReadRows; - bool DoSelect; - -}; - -} -} diff --git a/ydb/public/lib/ydb_cli/commands/stock_workload.cpp b/ydb/public/lib/ydb_cli/commands/stock_workload.cpp deleted file mode 100644 index f7b1ff8a098a..000000000000 --- a/ydb/public/lib/ydb_cli/commands/stock_workload.cpp +++ /dev/null @@ -1,265 +0,0 @@ -#include "stock_workload.h" - -#include -#include -#include - -namespace NYdb::NConsoleClient { - -NTable::TSession TWorkloadCommand::GetSession() { - NTable::TCreateSessionResult result = TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync(); - ThrowOnError(result); - return result.GetSession(); -} - -TCommandStock::TCommandStock() - : TClientCommandTree("stock", {}, "YDB stock workload") -{ - AddCommand(std::make_unique()); - AddCommand(std::make_unique()); - AddCommand(std::make_unique()); -} - -TCommandStockInit::TCommandStockInit() - : TWorkloadCommand("init", {}, "Create and initialize tables for workload") - , ProductCount(0) - , Quantity(0) - , MinPartitions(0) - , PartitionsByLoad(true) - , EnableCdc(false) -{} - -void TCommandStockInit::Config(TConfig& config) { - TYdbCommand::Config(config); - - config.SetFreeArgsNum(0); - - config.Opts->AddLongOption('p', "products", "Product count. Value in 1..500 000.") - .DefaultValue(100).StoreResult(&ProductCount); - config.Opts->AddLongOption('q', "quantity", "Quantity of each product in stock.") - .DefaultValue(1000).StoreResult(&Quantity); - config.Opts->AddLongOption('o', "orders", "Initial orders count.") - .DefaultValue(100).StoreResult(&OrderCount); - config.Opts->AddLongOption("min-partitions", "Minimum partitions for tables.") - .DefaultValue(40).StoreResult(&MinPartitions); - config.Opts->AddLongOption("auto-partition", "Enable auto partitioning by load.") - .DefaultValue(true).StoreResult(&PartitionsByLoad); - config.Opts->AddLongOption("enable-cdc", "Create changefeeds on tables.") - .DefaultValue(false).StoreTrue(&EnableCdc).Hidden(); -} - -void TCommandStockInit::Parse(TConfig& config) { - TClientCommand::Parse(config); -} - -int TCommandStockInit::Run(TConfig& config) { - if (ProductCount > 500'000) { - throw TMisuseException() << "Product count must be in range 1..500 000." << Endl; - } - - Driver = std::make_unique(CreateDriver(config)); - TableClient = std::make_unique(*Driver); - NYdbWorkload::TStockWorkloadParams params; - params.DbPath = config.Database; - params.ProductCount = ProductCount; - params.Quantity = Quantity; - params.OrderCount = OrderCount; - params.MinPartitions = MinPartitions; - params.PartitionsByLoad = PartitionsByLoad; - params.EnableCdc = EnableCdc; - - NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); - - return InitTables(workloadGen); -} - -TCommandStockClean::TCommandStockClean() - : TWorkloadCommand("clean", {}, "Drop tables created in init phase") {} - -void TCommandStockClean::Config(TConfig& config) { - TWorkloadCommand::Config(config); - config.SetFreeArgsNum(0); -} - -void TCommandStockClean::Parse(TConfig& config) { - TClientCommand::Parse(config); -} - -int TCommandStockClean::Run(TConfig& config) { - Driver = std::make_unique(CreateDriver(config)); - TableClient = std::make_unique(*Driver); - NYdbWorkload::TStockWorkloadParams params; - params.DbPath = config.Database; - - NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); - - return CleanTables(workloadGen); -} - -TCommandStockRun::TCommandStockRun() - : TClientCommandTree("run", {}, "Run YDB stock workload") -{ - AddCommand(std::make_unique()); - AddCommand(std::make_unique()); - AddCommand(std::make_unique()); - AddCommand(std::make_unique()); - AddCommand(std::make_unique()); -} - -TCommandStockRunInsertRandomOrder::TCommandStockRunInsertRandomOrder() - : TWorkloadCommand("add-rand-order", {}, "Inserts orders with random ID without their processing") - , ProductCount(0) -{} - -void TCommandStockRunInsertRandomOrder::Config(TConfig& config) { - TWorkloadCommand::Config(config); - - config.SetFreeArgsNum(0); - - config.Opts->AddLongOption('p', "products", "Products count to use in workload.") - .DefaultValue(100).StoreResult(&ProductCount); -} - -void TCommandStockRunInsertRandomOrder::Parse(TConfig& config) { - TClientCommand::Parse(config); -} - -int TCommandStockRunInsertRandomOrder::Run(TConfig& config) { - PrepareForRun(config); - - NYdbWorkload::TStockWorkloadParams params; - params.DbPath = config.Database; - params.ProductCount = ProductCount; - - NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); - - return RunWorkload(workloadGen, static_cast(NYdbWorkload::TStockWorkloadGenerator::EType::InsertRandomOrder)); -} - -TCommandStockRunSubmitRandomOrder::TCommandStockRunSubmitRandomOrder() - : TWorkloadCommand("put-rand-order", {}, "Submit random orders with processing") - , ProductCount(0) -{} - -void TCommandStockRunSubmitRandomOrder::Config(TConfig& config) { - TWorkloadCommand::Config(config); - - config.SetFreeArgsNum(0); - - config.Opts->AddLongOption('p', "products", "Products count to use in workload.") - .DefaultValue(100).StoreResult(&ProductCount); -} - -void TCommandStockRunSubmitRandomOrder::Parse(TConfig& config) { - TClientCommand::Parse(config); -} - -int TCommandStockRunSubmitRandomOrder::Run(TConfig& config) { - PrepareForRun(config); - - NYdbWorkload::TStockWorkloadParams params; - params.DbPath = config.Database; - params.ProductCount = ProductCount; - - NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); - - return RunWorkload(workloadGen, static_cast(NYdbWorkload::TStockWorkloadGenerator::EType::SubmitRandomOrder)); -} - -TCommandStockRunSubmitSameOrder::TCommandStockRunSubmitSameOrder() - : TWorkloadCommand("put-same-order", {}, "Submit orders with same products with processing") - , ProductCount(0) -{} - -void TCommandStockRunSubmitSameOrder::Config(TConfig& config) { - TWorkloadCommand::Config(config); - - config.SetFreeArgsNum(0); - - config.Opts->AddLongOption('p', "products", "Products count to use in workload.") - .DefaultValue(100).StoreResult(&ProductCount); -} - -void TCommandStockRunSubmitSameOrder::Parse(TConfig& config) { - TClientCommand::Parse(config); -} - -int TCommandStockRunSubmitSameOrder::Run(TConfig& config) { - PrepareForRun(config); - - NYdbWorkload::TStockWorkloadParams params; - params.DbPath = config.Database; - params.ProductCount = ProductCount; - - NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); - - return RunWorkload(workloadGen, static_cast(NYdbWorkload::TStockWorkloadGenerator::EType::SubmitSameOrder)); -} - -TCommandStockRunGetRandomCustomerHistory::TCommandStockRunGetRandomCustomerHistory() - : TWorkloadCommand("rand-user-hist", {}, "Selects orders of random customer") - , Limit(0) -{} - -void TCommandStockRunGetRandomCustomerHistory::Config(TConfig& config) { - TWorkloadCommand::Config(config); - - config.SetFreeArgsNum(0); - - config.Opts->AddLongOption('l', "limit", "Number of last orders to select.") - .DefaultValue(10).StoreResult(&Limit); -} - -void TCommandStockRunGetRandomCustomerHistory::Parse(TConfig& config) { - TClientCommand::Parse(config); -} - -int TCommandStockRunGetRandomCustomerHistory::Run(TConfig& config) { - PrepareForRun(config); - - NYdbWorkload::TStockWorkloadParams params; - params.DbPath = config.Database; - params.Limit = Limit; - - NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); - return RunWorkload(workloadGen, static_cast(NYdbWorkload::TStockWorkloadGenerator::EType::GetRandomCustomerHistory)); -} - -TCommandStockRunGetCustomerHistory::TCommandStockRunGetCustomerHistory() - : TWorkloadCommand("user-hist", {}, "Selects orders of 10000th customer") - , Limit(0) -{} - -void TCommandStockRunGetCustomerHistory::Config(TConfig& config) { - TWorkloadCommand::Config(config); - - config.SetFreeArgsNum(0); - - config.Opts->AddLongOption('l', "limit", "Number of last orders to select.") - .DefaultValue(10).StoreResult(&Limit); -} - -void TCommandStockRunGetCustomerHistory::Parse(TConfig& config) { - TClientCommand::Parse(config); -} - -int TCommandStockRunGetCustomerHistory::Run(TConfig& config) { - PrepareForRun(config); - - NYdbWorkload::TStockWorkloadParams params; - params.DbPath = config.Database; - params.Limit = Limit; - - NYdbWorkload::TWorkloadFactory factory; - auto workloadGen = factory.GetWorkloadQueryGenerator(NYdbWorkload::EWorkload::STOCK, ¶ms); - - return RunWorkload(workloadGen, static_cast(NYdbWorkload::TStockWorkloadGenerator::EType::GetCustomerHistory)); -} - -} // namespace NYdb::NConsoleClient { diff --git a/ydb/public/lib/ydb_cli/commands/stock_workload.h b/ydb/public/lib/ydb_cli/commands/stock_workload.h deleted file mode 100644 index 6c4c71b6a09b..000000000000 --- a/ydb/public/lib/ydb_cli/commands/stock_workload.h +++ /dev/null @@ -1,99 +0,0 @@ -#pragma once - -#include "ydb/public/lib/ydb_cli/commands/ydb_workload.h" - -namespace NYdb { -namespace NConsoleClient { - -class TCommandStock : public TClientCommandTree { -public: - TCommandStock(); -}; - -class TCommandStockInit : public TWorkloadCommand { -public: - TCommandStockInit(); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; - -private: - size_t ProductCount; - size_t Quantity; - size_t OrderCount; - unsigned int MinPartitions; - bool PartitionsByLoad; - bool EnableCdc; -}; - -class TCommandStockClean : public TWorkloadCommand { -public: - TCommandStockClean(); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; - -}; - -class TCommandStockRun : public TClientCommandTree { -public: - TCommandStockRun(); -}; - -class TCommandStockRunInsertRandomOrder : public TWorkloadCommand { -public: - TCommandStockRunInsertRandomOrder(); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; - -private: - size_t ProductCount; -}; - -class TCommandStockRunSubmitRandomOrder : public TWorkloadCommand { -public: - TCommandStockRunSubmitRandomOrder(); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; - -private: - size_t ProductCount; -}; - -class TCommandStockRunSubmitSameOrder : public TWorkloadCommand { -public: - TCommandStockRunSubmitSameOrder(); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; - -private: - size_t ProductCount; -}; - -class TCommandStockRunGetRandomCustomerHistory : public TWorkloadCommand { -public: - TCommandStockRunGetRandomCustomerHistory(); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; - -private: - unsigned int Limit; -}; - -class TCommandStockRunGetCustomerHistory : public TWorkloadCommand { -public: - TCommandStockRunGetCustomerHistory(); - virtual void Config(TConfig& config) override; - virtual void Parse(TConfig& config) override; - virtual int Run(TConfig& config) override; - -private: - unsigned int Limit; -}; - -} -} diff --git a/ydb/public/lib/ydb_cli/commands/ya.make b/ydb/public/lib/ydb_cli/commands/ya.make index af38f94ce2bc..67a349b823bc 100644 --- a/ydb/public/lib/ydb_cli/commands/ya.make +++ b/ydb/public/lib/ydb_cli/commands/ya.make @@ -5,8 +5,6 @@ SRCS( interactive/line_reader.cpp benchmark_utils.cpp click_bench.cpp - kv_workload.cpp - stock_workload.cpp topic_operations_scenario.cpp topic_read_scenario.cpp topic_write_scenario.cpp diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp index a55d81c6d96f..835c9110ef37 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_topic.cpp @@ -94,7 +94,15 @@ namespace NYdb::NConsoleClient { return description.Str(); } - + + TVector InitAllowedCodecs() { + return TVector{ + NYdb::NTopic::ECodec::RAW, + NYdb::NTopic::ECodec::ZSTD, + NYdb::NTopic::ECodec::GZIP, + }; + } + namespace { NTopic::ECodec ParseCodec(const TString& codecStr, const TVector& allowedCodecs) { auto exists = ExistingCodecs.find(to_lower(codecStr)); diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp index d631f8d7673c..98ec3cb4aaf8 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp @@ -1,7 +1,5 @@ #include "ydb_workload.h" -#include "stock_workload.h" -#include "kv_workload.h" #include "click_bench.h" #include "tpch.h" #include "topic_workload/topic_workload.h" @@ -11,10 +9,14 @@ #include #include +#include +#include #include #include +#include +#include #include #include @@ -42,13 +44,14 @@ TWorkloadStats GetWorkloadStats(const NHdr::THistogram& hdr) { TCommandWorkload::TCommandWorkload() : TClientCommandTree("workload", {}, "YDB workload service") { - AddCommand(std::make_unique()); - AddCommand(std::make_unique()); AddCommand(std::make_unique()); AddCommand(std::make_unique()); AddCommand(std::make_unique()); AddCommand(std::make_unique()); AddCommand(std::make_unique()); + for (const auto& key: NYdbWorkload::TWorkloadFactory::GetRegisteredKeys()) { + AddCommand(std::make_unique(key.c_str())); + } } TWorkloadCommand::TWorkloadCommand(const TString& name, const std::initializer_list& aliases, const TString& description) @@ -123,7 +126,7 @@ void TWorkloadCommand::PrepareForRun(TConfig& config) { } } -void TWorkloadCommand::WorkerFn(int taskId, TWorkloadQueryGenPtr workloadGen, const int type) { +void TWorkloadCommand::WorkerFn(int taskId, NYdbWorkload::IWorkloadQueryGenerator& workloadGen, const int type) { const auto dataQuerySettings = NYdb::NTable::TExecDataQuerySettings() .KeepInQueryCache(true) .OperationTimeout(TDuration::MilliSeconds(OperationTimeoutMs)) @@ -195,7 +198,7 @@ void TWorkloadCommand::WorkerFn(int taskId, TWorkloadQueryGenPtr workloadGen, co }; while (Now() < StopTime) { - auto queryInfoList = workloadGen->GetWorkload(type); + auto queryInfoList = workloadGen.GetWorkload(type); if (queryInfoList.empty()) { Cerr << "Task ID: " << taskId << ". No queries to run." << Endl; return; @@ -234,7 +237,7 @@ void TWorkloadCommand::WorkerFn(int taskId, TWorkloadQueryGenPtr workloadGen, co WindowRetryCount += std::max(retryCount, 0); } -int TWorkloadCommand::RunWorkload(TWorkloadQueryGenPtr workloadGen, const int type) { +int TWorkloadCommand::RunWorkload(NYdbWorkload::IWorkloadQueryGenerator& workloadGen, const int type) { if (!Quiet) { std::cout << "Window\tTxs/Sec\tRetries\tErrors\tp50(ms)\tp95(ms)\tp99(ms)\tpMax(ms)"; if (PrintTimestamp) { @@ -298,12 +301,113 @@ void TWorkloadCommand::PrintWindowStats(int windowIt) { } } -int TWorkloadCommand::InitTables(std::shared_ptr workloadGen) { +namespace { + bool WaitBulk(const NYdb::NTable::TAsyncBulkUpsertResult& prevResult, TAtomic& errors, const std::string& table, TAdaptiveLock& lock) { + if (prevResult.Initialized()) { + try { + const auto& res = prevResult.GetValueSync(); + if (!res.IsSuccess()) { + auto g = Guard(lock); + Cerr << "Bulk upset to " << table << " failed: " << res.GetIssues().ToString() << Endl; + AtomicIncrement(errors); + return false; + } + } catch (...) { + auto g = Guard(lock); + Cerr << "Bulk upset to " << table << " failed: " << CurrentExceptionMessage() << ", backtrace: "; + PrintBackTrace(); + AtomicIncrement(errors); + return false; + } + } + return true; + } +} + +TWorkloadCommandInit::TWorkloadCommandInit(const TString& key) + : TWorkloadCommandBase("init", key, NYdbWorkload::TWorkloadParams::ECommandType::Init, "Create and initialize tables for workload") +{} + +int TWorkloadCommandInit::Run(TConfig& config) { + Driver = std::make_unique(CreateDriver(config)); + TableClient = std::make_unique(*Driver); + Params->DbPath = config.Database; + auto workloadGen = Params->CreateGenerator(); + return InitTables(*workloadGen); +} + +void TWorkloadCommandInit::Config(TConfig& config) { + TYdbCommand::Config(config); + Params->ConfigureOpts(*config.Opts, CommandType, Type); +} + +TWorkloadCommandRun::TWorkloadCommandRun(const TString& key, const NYdbWorkload::IWorkloadQueryGenerator::TWorkloadType& workload) + : TWorkloadCommandBase(workload.CommandName, key, NYdbWorkload::TWorkloadParams::ECommandType::Run, workload.Description) +{ + Type = workload.Type; +} + +int TWorkloadCommandRun::Run(TConfig& config) { + PrepareForRun(config); + Params->DbPath = config.Database; + auto workloadGen = Params->CreateGenerator(); + return RunWorkload(*workloadGen, Type); +} + + +TWorkloadCommandClean::TWorkloadCommandClean(const TString& key) + : TWorkloadCommandBase("clean", key, NYdbWorkload::TWorkloadParams::ECommandType::Clean, "Drop tables created in init phase") +{} + +int TWorkloadCommandClean::Run(TConfig& config) { + Params->DbPath = config.Database; + auto workloadGen = Params->CreateGenerator(); + return CleanTables(*workloadGen, config); +} + + +TWorkloadCommandBase::TWorkloadCommandBase(const TString& name, const TString& key, const NYdbWorkload::TWorkloadParams::ECommandType commandType, const TString& description /*= TString()*/) + : TWorkloadCommand(name, std::initializer_list(), description) + , CommandType(commandType) + , Params(NYdbWorkload::TWorkloadFactory::MakeHolder(key)) +{} + +void TWorkloadCommandBase::Config(TConfig& config) { + TWorkloadCommand::Config(config); + Params->ConfigureOpts(*config.Opts, CommandType, Type); +} + + +TWorkloadCommandRoot::TWorkloadCommandRoot(const TString& key) + : TClientCommandTree(key, {}, "YDB " + NYdbWorkload::TWorkloadFactory::MakeHolder(key)->GetWorkloadName() + " workload") +{ + AddCommand(std::make_unique(key)); + auto supportedWorkloads = NYdbWorkload::TWorkloadFactory::MakeHolder(key)->CreateGenerator()->GetSupportedWorkloadTypes(); + switch (supportedWorkloads.size()) { + case 0: + break; + case 1: + supportedWorkloads.back().CommandName = "run"; + AddCommand(std::make_unique(key, supportedWorkloads.back())); + break; + default: { + auto run = std::make_unique("run", std::initializer_list(), "Run YDB " + NYdbWorkload::TWorkloadFactory::MakeHolder(key)->GetWorkloadName() + " workload"); + for (const auto& type: supportedWorkloads) { + run->AddCommand(std::make_unique(key, type)); + } + AddCommand(std::move(run)); + break; + } + } + AddCommand(std::make_unique(key)); +} + +int TWorkloadCommand::InitTables(NYdbWorkload::IWorkloadQueryGenerator& workloadGen) { auto session = GetSession(); - auto result = session.ExecuteSchemeQuery(workloadGen->GetDDLQueries()).GetValueSync(); + auto result = session.ExecuteSchemeQuery(workloadGen.GetDDLQueries()).GetValueSync(); ThrowOnError(result); - auto queryInfoList = workloadGen->GetInitialData(); + auto queryInfoList = workloadGen.GetInitialData(); for (auto queryInfo : queryInfoList) { auto prepareResult = session.PrepareDataQuery(queryInfo.Query.c_str()).GetValueSync(); if (!prepareResult.IsSuccess()) { @@ -322,23 +426,48 @@ int TWorkloadCommand::InitTables(std::shared_ptrGenerateDataPortion(); data.Defined() && !AtomicGet(errors); data = dataGen->GenerateDataPortion()) { + if (WaitBulk(prevResult, errors, dataGen->GetTable(), lock)) { + prevResult = TableClient->BulkUpsert(dataGen->GetTable(), std::move(*data)); + } + } + if (WaitBulk(prevResult, errors, dataGen->GetTable(), lock)) { + auto g = Guard(lock); + Cout << "Fill table " << dataGen->GetTable() << "..." << (AtomicGet(errors) ? "Breaked" : "OK" ) << Endl; + } + }); + } + pool.Stop(); + return AtomicGet(errors) ? EXIT_FAILURE : EXIT_SUCCESS; } -int TWorkloadCommand::CleanTables(std::shared_ptr workloadGen) { - auto session = GetSession(); - - auto query = workloadGen->GetCleanDDLQueries(); - TStatus result(EStatus::SUCCESS, NYql::TIssues()); - result = session.ExecuteSchemeQuery(TString(query)).GetValueSync(); - - if (!result.IsSuccess()) { - Cerr << "Query execution failed: " << result.GetIssues().ToString() << Endl - << "Query:\n" << query << Endl; - return EXIT_FAILURE; +int TWorkloadCommand::CleanTables(const NYdbWorkload::IWorkloadQueryGenerator& workloadGen, TConfig& config) { + auto driver = CreateDriver(config); + NTable::TTableClient tableClient(driver); + NTopic::TTopicClient topicClient(driver); + NScheme::TSchemeClient schemeClient(driver); + auto pathsToDelete = workloadGen.GetCleanPaths(); + NScheme::TRemoveDirectorySettings settings; + for (const auto& path : pathsToDelete) { + auto fullPath = config.Database + "/" + path.c_str(); + ThrowOnError(RemovePathRecursive(schemeClient, tableClient, topicClient, fullPath, ERecursiveRemovePrompt::Never, settings)); } return EXIT_SUCCESS; } +NTable::TSession TWorkloadCommand::GetSession() { + NTable::TCreateSessionResult result = TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync(); + ThrowOnError(result); + return result.GetSession(); +} + } // namespace NYdb::NConsoleClient diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.h b/ydb/public/lib/ydb_cli/commands/ydb_workload.h index 26500fab51da..15365b049e2f 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.h @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -10,10 +11,6 @@ #include #include -namespace NYdbWorkload { - class IWorkloadQueryGenerator; -} - namespace NYdb { namespace NConsoleClient { @@ -34,12 +31,10 @@ class TWorkloadCommand : public TYdbCommand { NTable::TSession GetSession(); protected: - using TWorkloadQueryGenPtr = std::shared_ptr; - void PrepareForRun(TConfig& config); - int RunWorkload(TWorkloadQueryGenPtr workloadGen, const int type); - void WorkerFn(int taskId, TWorkloadQueryGenPtr workloadGen, const int type); + int RunWorkload(NYdbWorkload::IWorkloadQueryGenerator& workloadGen, const int type); + void WorkerFn(int taskId, NYdbWorkload::IWorkloadQueryGenerator& workloadGen, const int type); void PrintWindowStats(int windowIt); std::unique_ptr Driver; @@ -71,10 +66,47 @@ class TWorkloadCommand : public TYdbCommand { std::atomic_uint64_t WindowErrors; protected: - int InitTables(std::shared_ptr workloadGen); + int InitTables(NYdbWorkload::IWorkloadQueryGenerator& workloadGen); + int CleanTables(const NYdbWorkload::IWorkloadQueryGenerator& workloadGen, TConfig& config); +}; - int CleanTables(std::shared_ptr workloadGen); +class TWorkloadCommandBase: public TWorkloadCommand { +public: + TWorkloadCommandBase( + const TString& name, + const TString& key, + const NYdbWorkload::TWorkloadParams::ECommandType commandType, + const TString& description = TString()); + virtual void Config(TConfig& config) override; +protected: + NYdbWorkload::TWorkloadParams::ECommandType CommandType; + THolder Params; + int Type = 0; +}; + +class TWorkloadCommandInit : public TWorkloadCommandBase { +public: + TWorkloadCommandInit(const TString& key); + virtual void Config(TConfig& config) override; + virtual int Run(TConfig& config) override; +}; + +class TWorkloadCommandRun : public TWorkloadCommandBase { +public: + TWorkloadCommandRun(const TString& key, const NYdbWorkload::IWorkloadQueryGenerator::TWorkloadType& workload); + virtual int Run(TConfig& config) override; +}; + +class TWorkloadCommandClean : public TWorkloadCommandBase { +public: + TWorkloadCommandClean(const TString& key); + virtual int Run(TConfig& config) override; +}; + +class TWorkloadCommandRoot : public TClientCommandTree { +public: + TWorkloadCommandRoot(const TString& key); }; }