diff --git a/ydb/core/keyvalue/keyvalue_ut_trace.cpp b/ydb/core/keyvalue/keyvalue_ut_trace.cpp new file mode 100644 index 000000000000..f918a189bdc8 --- /dev/null +++ b/ydb/core/keyvalue/keyvalue_ut_trace.cpp @@ -0,0 +1,154 @@ +#include +#include +#include +#include +#include +#include +#include + +#include + +using namespace NActors; +using namespace NKikimr; + +struct TTestEnvironment { + THolder Runtime; + const ui32 NodeCount; + TActorId Edge; + const ui64 TabletId = MakeTabletID(0, 0, 1); + const TTabletTypes::EType TabletType = TTabletTypes::KeyValue; + NWilson::TFakeWilsonUploader* WilsonUploader = nullptr; + + TTestEnvironment(ui32 nodeCount): NodeCount(nodeCount) { + } + + void Prepare() { + SetupRuntime(); + InitializeRuntime(); + + Edge = Runtime->AllocateEdgeActor(); + CreateTestBootstrapper(*Runtime, + CreateTestTabletInfo(TabletId, TabletType, TErasureType::ErasureNone), + &CreateKeyValueFlat); + SetupFakeWilson(); + + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); + Runtime->DispatchEvents(options); + } + + void InitializeRuntime() { + TAppPrepare app; + app.AddDomain(TDomainsInfo::TDomain::ConstructEmptyDomain("dc-1").Release()); + SetupTabletServices(*Runtime, &app); + } + + void SetupRuntime() { + Runtime = MakeHolder(NodeCount, 1u); + + for (ui32 i = 0; i < NodeCount; ++i) { + SetupStateStorage(*Runtime, i, 0, true); + SetupTabletResolver(*Runtime, i); + } + } + + void SetupFakeWilson() { + WilsonUploader = new NWilson::TFakeWilsonUploader; + auto actorId = Runtime->Register(WilsonUploader); + Runtime->RegisterService(NWilson::MakeWilsonUploaderId(), actorId); + } + + template + auto DoKVRequest(THolder request) { + Runtime->SendToPipe(TabletId, Edge, request.Release(), 0, NTabletPipe::TClientConfig(), TActorId(), + 0, NWilson::TTraceId::NewTraceId(15, 4095)); + TAutoPtr handle; + auto response = Runtime->GrabEdgeEventRethrow(handle); + UNIT_ASSERT(response); + auto& record = response->Record; + UNIT_ASSERT_EQUAL(record.status(), NKikimrKeyValue::Statuses::RSTATUS_OK); + + return std::move(record); + } +}; + +THolder CreateWrite(TString key, TString value) { + auto request = MakeHolder(); + auto write = request->Record.add_commands()->mutable_write(); + write->set_key(std::move(key)); + write->set_value(std::move(value)); + return request; +} + +THolder CreateRead(TString key) { + auto request = MakeHolder(); + auto& record = request->Record; + record.set_key(std::move(key)); + record.set_offset(0); + record.set_size(0); + record.set_limit_bytes(0); + return request; +} + +void TestOneWrite(TString value, TString expectedTrace) { + TTestEnvironment env(8); + env.Prepare(); + + env.DoKVRequest(CreateWrite("key", std::move(value))); + + UNIT_ASSERT(env.WilsonUploader->BuildTraceTrees()); + UNIT_ASSERT_EQUAL(env.WilsonUploader->Traces.size(), 1); + auto& trace = env.WilsonUploader->Traces.begin()->second; + + UNIT_ASSERT_EQUAL(trace.ToString(), expectedTrace); +} + +void TestOneRead(TString value, TString expectedTrace) { + TTestEnvironment env(8); + env.Prepare(); + + env.DoKVRequest(CreateWrite("key", value)); + env.WilsonUploader->Clear(); + + auto response = env.DoKVRequest(CreateRead("key")); + UNIT_ASSERT_EQUAL(response.value(), value); + + UNIT_ASSERT(env.WilsonUploader->BuildTraceTrees()); + UNIT_ASSERT_EQUAL(env.WilsonUploader->Traces.size(), 1); + auto& trace = env.WilsonUploader->Traces.begin()->second; + + UNIT_ASSERT_EQUAL(trace.ToString(), expectedTrace); +} + +Y_UNIT_TEST_SUITE(TKeyValueTracingTest) { + const TString SmallValue = "value"; + const TString HugeValue = TString(1 << 20, 'v'); + +Y_UNIT_TEST(WriteSmall) { + TString canon = "(KeyValue.Intermediate -> [(KeyValue.StorageRequest -> [(DSProxy.Put -> [(Backpressure.InFlight " + "-> [(VDisk.Log.Put)])])]) , (Tablet.Transaction -> [(Tablet.Transaction.Execute) , (Tablet.WriteLog -> " + "[(Tablet.WriteLog.LogEntry -> [(DSProxy.Put -> [(Backpressure.InFlight -> [(VDisk.Log.Put)])])])])])])"; + TestOneWrite(SmallValue, std::move(canon)); +} + +Y_UNIT_TEST(WriteHuge) { + TString canon = "(KeyValue.Intermediate -> [(KeyValue.StorageRequest -> [(DSProxy.Put -> [(Backpressure.InFlight " + "-> [(VDisk.HugeBlobKeeper.Write -> [(VDisk.Log.PutHuge)])])])]) , (Tablet.Transaction -> " + "[(Tablet.Transaction.Execute) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry -> [(DSProxy.Put -> " + "[(Backpressure.InFlight -> [(VDisk.Log.Put)])])])])])])"; + TestOneWrite(HugeValue, std::move(canon)); +} + +Y_UNIT_TEST(ReadSmall) { + TString canon = "(KeyValue.Intermediate -> [(KeyValue.StorageReadRequest -> [(DSProxy.Get -> [(Backpressure.InFlight -> " + "[(VDisk.LevelIndexExtremeQueryViaBatcherMergeData)])])])])"; + TestOneRead(SmallValue, std::move(canon)); +} + +Y_UNIT_TEST(ReadHuge) { + TString canon = "(KeyValue.Intermediate -> [(KeyValue.StorageReadRequest -> [(DSProxy.Get -> [(Backpressure.InFlight -> " + "[(VDisk.LevelIndexExtremeQueryViaBatcherMergeData -> [(VDisk.Query.ReadBatcher)])])])])])"; + TestOneRead(HugeValue, std::move(canon)); +} + +} diff --git a/ydb/core/keyvalue/ut_trace/ya.make b/ydb/core/keyvalue/ut_trace/ya.make new file mode 100644 index 000000000000..a6c8e0b9d692 --- /dev/null +++ b/ydb/core/keyvalue/ut_trace/ya.make @@ -0,0 +1,26 @@ +UNITTEST_FOR(ydb/core/keyvalue) + +FORK_SUBTESTS() + +SPLIT_FACTOR(5) + +IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND) + TIMEOUT(1800) + SIZE(LARGE) + TAG(ya:fat) +ELSE() + TIMEOUT(600) + SIZE(MEDIUM) +ENDIF() + +PEERDIR( + ydb/core/testlib/default +) + +SRCS( + keyvalue_ut_trace.cpp +) + +REQUIREMENTS(ram:16) + +END() diff --git a/ydb/core/keyvalue/ya.make b/ydb/core/keyvalue/ya.make index f8cfaab1866f..3a014e31316d 100644 --- a/ydb/core/keyvalue/ya.make +++ b/ydb/core/keyvalue/ya.make @@ -62,4 +62,5 @@ RECURSE( RECURSE_FOR_TESTS( ut + ut_trace ) diff --git a/ydb/core/tx/datashard/datashard_ut_trace.cpp b/ydb/core/tx/datashard/datashard_ut_trace.cpp index 50a5670865de..c8491d4ae513 100644 --- a/ydb/core/tx/datashard/datashard_ut_trace.cpp +++ b/ydb/core/tx/datashard/datashard_ut_trace.cpp @@ -8,7 +8,7 @@ #include #include #include -#include +#include #include @@ -18,9 +18,9 @@ using namespace NKikimr::NDataShard::NKqpHelpers; using namespace NSchemeShard; using namespace Tests; using namespace NDataShardReadTableTest; +using namespace NWilson; Y_UNIT_TEST_SUITE(TDataShardTrace) { - void ExecSQL(Tests::TServer::TPtr server, TActorId sender, const TString &sql, @@ -108,154 +108,6 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetRef().GetYdbStatus(), code); } - class FakeWilsonUploader : public TActorBootstrapped { - public: - class Span { - public: - Span(TString name, TString parentSpanId, ui64 startTime) : Name(name), ParentSpanId(parentSpanId), StartTime(startTime) {} - - std::optional> FindOne(TString targetName) { - for (const auto childRef : Children) { - if (childRef.get().Name == targetName) { - return childRef; - } - } - - return {}; - } - - std::vector> FindAll(TString targetName) { - std::vector> res; - - for (const auto childRef : Children) { - if (childRef.get().Name == targetName) { - res.emplace_back(childRef); - } - } - - return res; - } - - std::optional> BFSFindOne(TString targetName) { - std::queue> bfsQueue; - bfsQueue.push(std::ref(*this)); - - while (!bfsQueue.empty()) { - Span ¤tSpan = bfsQueue.front().get(); - bfsQueue.pop(); - - if (currentSpan.Name == targetName) { - return currentSpan; - } - - for (const auto childRef : currentSpan.Children) { - bfsQueue.push(childRef); - } - } - - return {}; - } - - static bool CompareByStartTime(const std::reference_wrapper& span1, const std::reference_wrapper& span2) { - return span1.get().StartTime < span2.get().StartTime; - } - - TString Name; - TString ParentSpanId; - ui64 StartTime; - std::set, decltype(&CompareByStartTime)> Children{&CompareByStartTime}; - }; - - class Trace { - public: - std::string ToString() const { - std::string result; - - for (const auto& spanPair : Spans) { - const Span& span = spanPair.second; - if (span.ParentSpanId.empty()) { - result += ToStringHelper(span); - } - } - - return result; - } - private: - std::string ToStringHelper(const Span& span) const { - std::string result = "(" + span.Name; - - if (!span.Children.empty()) { - result += " -> ["; - auto it = span.Children.begin(); - while (it != span.Children.end()) { - const Span& childSpan = it->get(); - result += ToStringHelper(childSpan); - ++it; - - if (it != span.Children.end()) { - result += " , "; - } - } - result += "]"; - } - - result += ")"; - - return result; - } - public: - std::unordered_map Spans; - - Span Root{"Root", "", 0}; - }; - - public: - void Bootstrap() { - Become(&TThis::StateFunc); - } - - void Handle(NWilson::TEvWilson::TPtr ev) { - auto& span = ev->Get()->Span; - const TString &traceId = span.trace_id(); - const TString &spanId = span.span_id(); - const TString &parentSpanId = span.parent_span_id(); - const TString &spanName = span.name(); - ui64 startTime = span.start_time_unix_nano(); - - Trace &trace = Traces[traceId]; - - trace.Spans.try_emplace(spanId, spanName, parentSpanId, startTime); - } - - void BuildTraceTrees() { - for (auto& tracePair : Traces) { - Trace& trace = tracePair.second; - - for (auto& spanPair : trace.Spans) { - Span& span = spanPair.second; - - const TString& parentSpanId = span.ParentSpanId; - - // Check if the span has a parent - if (!parentSpanId.empty()) { - auto parentSpanIt = trace.Spans.find(parentSpanId); - UNIT_ASSERT(parentSpanIt != trace.Spans.end()); - parentSpanIt->second.Children.insert(std::ref(span)); - } else { - trace.Root.Children.insert(std::ref(span)); - } - } - } - } - - STRICT_STFUNC(StateFunc, - hFunc(NWilson::TEvWilson, Handle); - ); - - public: - std::unordered_map Traces; - }; - void SplitTable(TTestActorRuntime &runtime, Tests::TServer::TPtr server, ui64 splitKey) { SetSplitMergePartCountLimit(server->GetRuntime(), -1); auto senderSplit = runtime.AllocateEdgeActor(); @@ -283,21 +135,21 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { return {runtime, server, sender}; } - void CheckTxHasWriteLog(std::reference_wrapper txSpan) { + void CheckTxHasWriteLog(std::reference_wrapper txSpan) { auto writeLogSpan = txSpan.get().FindOne("Tablet.WriteLog"); UNIT_ASSERT(writeLogSpan); auto writeLogEntrySpan = writeLogSpan->get().FindOne("Tablet.WriteLog.LogEntry"); UNIT_ASSERT(writeLogEntrySpan); } - void CheckTxHasDatashardUnits(std::reference_wrapper txSpan, ui8 count) { + void CheckTxHasDatashardUnits(std::reference_wrapper txSpan, ui8 count) { auto executeSpan = txSpan.get().FindOne("Tablet.Transaction.Execute"); UNIT_ASSERT(executeSpan); auto unitSpans = executeSpan->get().FindAll("Datashard.Unit"); UNIT_ASSERT_EQUAL(count, unitSpans.size()); } - void CheckExecuteHasDatashardUnits(std::reference_wrapper executeSpan, ui8 count) { + void CheckExecuteHasDatashardUnits(std::reference_wrapper executeSpan, ui8 count) { auto unitSpans = executeSpan.get().FindAll("Datashard.Unit"); UNIT_ASSERT_EQUAL(count, unitSpans.size()); } @@ -307,7 +159,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { CreateShardedTable(server, sender, "/Root", "table-1", 1, false); - FakeWilsonUploader *uploader = new FakeWilsonUploader(); + TFakeWilsonUploader *uploader = new TFakeWilsonUploader(); TActorId uploaderId = runtime.Register(uploader, 0); runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); runtime.SimulateSleep(TDuration::Seconds(10)); @@ -323,11 +175,10 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { std::move(traceId) ); - uploader->BuildTraceTrees(); - + UNIT_ASSERT(uploader->BuildTraceTrees()); UNIT_ASSERT_EQUAL(1, uploader->Traces.size()); - FakeWilsonUploader::Trace &trace = uploader->Traces.begin()->second; + TFakeWilsonUploader::Trace &trace = uploader->Traces.begin()->second; auto deSpan = trace.Root.BFSFindOne("DataExecuter"); UNIT_ASSERT(deSpan); @@ -367,7 +218,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { CreateShardedTable(server, sender, "/Root", "table-1", 1, false); - FakeWilsonUploader *uploader = new FakeWilsonUploader(); + TFakeWilsonUploader *uploader = new TFakeWilsonUploader(); TActorId uploaderId = runtime.Register(uploader, 0); runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); runtime.SimulateSleep(TDuration::Seconds(10)); @@ -414,11 +265,10 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { std::move(traceId) ); - uploader->BuildTraceTrees(); - + UNIT_ASSERT(uploader->BuildTraceTrees()); UNIT_ASSERT_EQUAL(1, uploader->Traces.size()); - FakeWilsonUploader::Trace &trace = uploader->Traces.begin()->second; + TFakeWilsonUploader::Trace &trace = uploader->Traces.begin()->second; std::string canon; if (server->GetSettings().AppConfig->GetTableServiceConfig().GetEnableKqpDataQueryStreamLookup()) { @@ -491,7 +341,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { CreateShardedTable(server, sender, "/Root", "table-1", 1, false); - FakeWilsonUploader* uploader = new FakeWilsonUploader(); + TFakeWilsonUploader* uploader = new TFakeWilsonUploader(); TActorId uploaderId = runtime.Register(uploader, 0); runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); runtime.SimulateSleep(TDuration::Seconds(10)); @@ -522,11 +372,10 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { std::move(traceId) ); - uploader->BuildTraceTrees(); - + UNIT_ASSERT(uploader->BuildTraceTrees()); UNIT_ASSERT_EQUAL(1, uploader->Traces.size()); - FakeWilsonUploader::Trace& trace = uploader->Traces.begin()->second; + TFakeWilsonUploader::Trace& trace = uploader->Traces.begin()->second; auto readActorSpan = trace.Root.BFSFindOne("ReadActor"); UNIT_ASSERT(readActorSpan); @@ -551,7 +400,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { auto opts = TShardedTableOptions().Columns({{"key", "Uint32", true, false}, {"value", "Uint32", false, false}}); auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts); - FakeWilsonUploader *uploader = new FakeWilsonUploader(); + TFakeWilsonUploader *uploader = new TFakeWilsonUploader(); TActorId uploaderId = runtime.Register(uploader, 0); runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); runtime.SimulateSleep(TDuration::Seconds(10)); @@ -561,11 +410,10 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { ui64 txId = 100; Write(runtime, sender, shards[0], tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE, NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED, std::move(traceId)); - uploader->BuildTraceTrees(); - + UNIT_ASSERT(uploader->BuildTraceTrees()); UNIT_ASSERT_EQUAL(1, uploader->Traces.size()); - FakeWilsonUploader::Trace &trace = uploader->Traces.begin()->second; + TFakeWilsonUploader::Trace &trace = uploader->Traces.begin()->second; auto wtSpan = trace.Root.BFSFindOne("Datashard.WriteTransaction"); UNIT_ASSERT(wtSpan); diff --git a/ydb/library/actors/wilson/test_util/fake_wilson_uploader.h b/ydb/library/actors/wilson/test_util/fake_wilson_uploader.h new file mode 100644 index 000000000000..9afc2a2e68dc --- /dev/null +++ b/ydb/library/actors/wilson/test_util/fake_wilson_uploader.h @@ -0,0 +1,164 @@ +#pragma once + +#include +#include +#include + +namespace NWilson { + + class TFakeWilsonUploader : public NActors::TActorBootstrapped { + public: + class Span { + public: + Span(TString name, TString parentSpanId, ui64 startTime) : Name(name), ParentSpanId(parentSpanId), StartTime(startTime) {} + + std::optional> FindOne(TString targetName) { + for (const auto childRef : Children) { + if (childRef.get().Name == targetName) { + return childRef; + } + } + + return {}; + } + + std::vector> FindAll(TString targetName) { + std::vector> res; + + for (const auto childRef : Children) { + if (childRef.get().Name == targetName) { + res.emplace_back(childRef); + } + } + + return res; + } + + std::optional> BFSFindOne(TString targetName) { + std::queue> bfsQueue; + bfsQueue.push(std::ref(*this)); + + while (!bfsQueue.empty()) { + Span ¤tSpan = bfsQueue.front().get(); + bfsQueue.pop(); + + if (currentSpan.Name == targetName) { + return currentSpan; + } + + for (const auto childRef : currentSpan.Children) { + bfsQueue.push(childRef); + } + } + + return {}; + } + + static bool CompareByStartTime(const std::reference_wrapper& span1, const std::reference_wrapper& span2) { + return span1.get().StartTime < span2.get().StartTime; + } + + TString Name; + TString ParentSpanId; + ui64 StartTime; + std::set, decltype(&CompareByStartTime)> Children{&CompareByStartTime}; + }; + + class Trace { + public: + std::string ToString() const { + std::string result; + + for (const auto& spanPair : Spans) { + const Span& span = spanPair.second; + if (span.ParentSpanId.empty()) { + result += ToStringHelper(span); + } + } + + return result; + } + private: + std::string ToStringHelper(const Span& span) const { + std::string result = "(" + span.Name; + + if (!span.Children.empty()) { + result += " -> ["; + auto it = span.Children.begin(); + while (it != span.Children.end()) { + const Span& childSpan = it->get(); + result += ToStringHelper(childSpan); + ++it; + + if (it != span.Children.end()) { + result += " , "; + } + } + result += "]"; + } + + result += ")"; + + return result; + } + public: + std::unordered_map Spans; + + Span Root{"Root", "", 0}; + }; + + public: + void Bootstrap() { + Become(&TThis::StateFunc); + } + + void Handle(NWilson::TEvWilson::TPtr ev) { + auto& span = ev->Get()->Span; + const TString &traceId = span.trace_id(); + const TString &spanId = span.span_id(); + const TString &parentSpanId = span.parent_span_id(); + const TString &spanName = span.name(); + ui64 startTime = span.start_time_unix_nano(); + + Trace &trace = Traces[traceId]; + + trace.Spans.try_emplace(spanId, spanName, parentSpanId, startTime); + } + + [[nodiscard]] bool BuildTraceTrees() { + for (auto& tracePair : Traces) { + Trace& trace = tracePair.second; + + for (auto& spanPair : trace.Spans) { + Span& span = spanPair.second; + + const TString& parentSpanId = span.ParentSpanId; + + // Check if the span has a parent + if (!parentSpanId.empty()) { + auto parentSpanIt = trace.Spans.find(parentSpanId); + if (parentSpanIt == trace.Spans.end()) { + return false; + } + parentSpanIt->second.Children.insert(std::ref(span)); + } else { + trace.Root.Children.insert(std::ref(span)); + } + } + } + return true; + } + + void Clear() { + Traces.clear(); + } + + STRICT_STFUNC(StateFunc, + hFunc(NWilson::TEvWilson, Handle); + ); + + public: + std::unordered_map Traces; + }; + +} // NWilson diff --git a/ydb/library/actors/wilson/test_util/ya.make b/ydb/library/actors/wilson/test_util/ya.make new file mode 100644 index 000000000000..4ff8f8c26e86 --- /dev/null +++ b/ydb/library/actors/wilson/test_util/ya.make @@ -0,0 +1,7 @@ +LIBRARY() + +SRCS( + fake_wilson_uploader.h +) + +END() diff --git a/ydb/library/actors/wilson/ya.make b/ydb/library/actors/wilson/ya.make index 9786754fae78..07524c01b151 100644 --- a/ydb/library/actors/wilson/ya.make +++ b/ydb/library/actors/wilson/ya.make @@ -22,4 +22,5 @@ RECURSE( RECURSE_FOR_TESTS( ut + test_util )