diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index a3058abc1cd7..407974c5866b 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -349,7 +349,7 @@ void TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, bool shareMailbox, bool op limits.MemoryQuotaManager = std::make_shared(limit * 2, limit); auto computeActor = NKikimr::NKqp::CreateKqpComputeActor(ExecuterId, TxId, taskDesc, AsyncIoFactory, - AppData()->FunctionRegistry, settings, limits, NWilson::TTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr()); + AppData()->FunctionRegistry, settings, limits, ExecuterSpan.GetTraceId(), TasksGraph.GetMeta().GetArenaIntrusivePtr()); if (optimizeProtoForLocalExecution) { TVector& taskSourceSettings = static_cast(computeActor)->MutableTaskSourceSettings(); diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 2d2eca184568..9d1815092fab 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include @@ -399,6 +400,7 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq , Counters(counters) , UseFollowers(false) , PipeCacheId(MainPipeCacheId) + , ReadActorSpan(TWilsonKqp::ReadActor, NWilson::TTraceId(args.TraceId), "ReadActor") { Y_ABORT_UNLESS(Arena); Y_ABORT_UNLESS(settings->GetArena() == Arena->Get()); @@ -569,6 +571,9 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq ResolveShards[ResolveShardId] = state; ResolveShardId += 1; + ReadActorStateSpan = NWilson::TSpan(TWilsonKqp::ReadActorShardsResolve, ReadActorSpan.GetTraceId(), + "WaitForShardsResolve", NWilson::EFlags::AUTO_END); + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {})); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request)); } @@ -617,9 +622,13 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq } } + ReadActorStateSpan.EndError(error); + return RuntimeError(error, statusCode); } + ReadActorStateSpan.EndOk(); + auto keyDesc = std::move(request->ResultSet[0].KeyDescription); if (keyDesc->GetPartitions().size() == 1) { @@ -896,10 +905,8 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq Counters->CreatedIterators->Inc(); ReadIdByTabletId[state->TabletId].push_back(id); - NWilson::TTraceId traceId; // TODO: get traceId from kqp. - Send(PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true), - IEventHandle::FlagTrackDelivery, 0, std::move(traceId)); + IEventHandle::FlagTrackDelivery, 0, ReadActorSpan.GetTraceId()); if (!FirstShardStarted) { state->IsFirst = true; @@ -1385,6 +1392,8 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq } } TBase::PassAway(); + + ReadActorSpan.End(); } void RuntimeError(const TString& message, NYql::NDqProto::StatusIds::StatusCode statusCode, const NYql::TIssues& subIssues = {}) { @@ -1395,6 +1404,11 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq NYql::TIssues issues; issues.AddIssue(std::move(issue)); + + if (ReadActorSpan) { + ReadActorSpan.EndError(issues.ToOneLineString()); + } + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), statusCode)); } @@ -1491,6 +1505,9 @@ class TKqpReadActor : public TActorBootstrapped, public NYql::NDq size_t TotalRetries = 0; bool FirstShardStarted = false; + + NWilson::TSpan ReadActorSpan; + NWilson::TSpan ReadActorStateSpan; }; diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 6834a048c4a5..b51d0d3b7faf 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -1,19 +1,20 @@ #include "kqp_stream_lookup_actor.h" -#include - #include #include #include #include +#include #include +#include +#include #include #include -#include + +#include #include -#include -#include #include +#include namespace NKikimr { namespace NKqp { @@ -25,24 +26,22 @@ static constexpr ui64 MAX_SHARD_RETRIES = 10; class TKqpStreamLookupActor : public NActors::TActorBootstrapped, public NYql::NDq::IDqComputeActorAsyncInput { public: - TKqpStreamLookupActor(ui64 inputIndex, NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, - const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv, - const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr& alloc, - const NYql::NDqProto::TTaskInput& inputDesc, NKikimrKqp::TKqpStreamLookupSettings&& settings, + TKqpStreamLookupActor(NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args, NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr counters) - : LogPrefix(TStringBuilder() << "StreamLookupActor, inputIndex: " << inputIndex << ", CA Id " << computeActorId) - , InputIndex(inputIndex) - , Input(input) - , ComputeActorId(computeActorId) - , TypeEnv(typeEnv) - , Alloc(alloc) + : LogPrefix(TStringBuilder() << "StreamLookupActor, inputIndex: " << args.InputIndex << ", CA Id " << args.ComputeActorId) + , InputIndex(args.InputIndex) + , Input(args.TransformInput) + , ComputeActorId(args.ComputeActorId) + , TypeEnv(args.TypeEnv) + , Alloc(args.Alloc) , Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId()) , LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe()) , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) - , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc)) + , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc)) , Counters(counters) + , LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), "LookupActor") { - IngressStats.Level = statsLevel; + IngressStats.Level = args.StatsLevel; } virtual ~TKqpStreamLookupActor() { @@ -174,6 +173,8 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped::PassAway(); + + LookupActorSpan.End(); } i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe&, bool& finished, i64 freeSpace) final { @@ -234,10 +235,15 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedGetTablePath()); if (ev->Get()->Request->ErrorCount > 0) { - return RuntimeError(TStringBuilder() << "Failed to get partitioning for table: " - << StreamLookupWorker->GetTablePath(), NYql::NDqProto::StatusIds::SCHEME_ERROR); + TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: " + << StreamLookupWorker->GetTablePath(); + LookupActorStateSpan.EndError(errorMsg); + + return RuntimeError(errorMsg, NYql::NDqProto::StatusIds::SCHEME_ERROR); } + LookupActorStateSpan.EndOk(); + auto& resultSet = ev->Get()->Request->ResultSet; YQL_ENSURE(resultSet.size() == 1, "Expected one result for range [NULL, +inf)"); Partitioning = resultSet[0].KeyDescription->Partitioning; @@ -342,8 +348,11 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedGetTablePath() - << " (request timeout exceeded)", NYql::NDqProto::StatusIds::TIMEOUT); + TString errorMsg = TStringBuilder() << "Failed to resolve shards for table: " << StreamLookupWorker->GetTablePath() + << " (request timeout exceeded)"; + LookupActorStateSpan.EndError(errorMsg); + + RuntimeError(errorMsg, NYql::NDqProto::StatusIds::TIMEOUT); } } @@ -392,7 +401,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped{})); Counters->IteratorsShardResolve->Inc(); + LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(), + "WaitForShardsResolve", NWilson::EFlags::AUTO_END); + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {})); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request)); @@ -467,6 +479,11 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped Counters; + NWilson::TSpan LookupActorSpan; + NWilson::TSpan LookupActorStateSpan; }; } // namespace -std::pair CreateStreamLookupActor(ui64 inputIndex, - NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, - const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, - std::shared_ptr& alloc, const NYql::NDqProto::TTaskInput& inputDesc, +std::pair CreateStreamLookupActor(NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args, NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr counters) { - auto actor = new TKqpStreamLookupActor(inputIndex, statsLevel, input, computeActorId, typeEnv, holderFactory, - alloc, inputDesc, std::move(settings), counters); + auto actor = new TKqpStreamLookupActor(std::move(args), std::move(settings), counters); return {actor, actor}; } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h index 7e8e266dbcbf..cb1c1ba8c2fb 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h @@ -7,10 +7,7 @@ namespace NKikimr { namespace NKqp { -std::pair CreateStreamLookupActor(ui64 inputIndex, - NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, - const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, - std::shared_ptr& alloc, const NYql::NDqProto::TTaskInput& inputDesc, +std::pair CreateStreamLookupActor(NYql::NDq::IDqAsyncIoFactory::TInputTransformArguments&& args, NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr); } // namespace NKqp diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp index ddcd068a20fa..d244303fd956 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp @@ -7,8 +7,7 @@ namespace NKqp { void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr counters) { factory.RegisterInputTransform("StreamLookupInputTransformer", [counters](NKikimrKqp::TKqpStreamLookupSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TInputTransformArguments&& args) { - return CreateStreamLookupActor(args.InputIndex, args.StatsLevel, args.TransformInput, args.ComputeActorId, args.TypeEnv, - args.HolderFactory, args.Alloc, args.InputDesc, std::move(settings), counters); + return CreateStreamLookupActor(std::move(args), std::move(settings), counters); }); } diff --git a/ydb/core/tx/datashard/datashard_ut_trace.cpp b/ydb/core/tx/datashard/datashard_ut_trace.cpp index 8438c1db04ee..2b377de08610 100644 --- a/ydb/core/tx/datashard/datashard_ut_trace.cpp +++ b/ydb/core/tx/datashard/datashard_ut_trace.cpp @@ -364,7 +364,7 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { } std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) " - ", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (RunTasks) , " + ", (LiteralExecuter) , (DataExecuter -> [(WaitForTableResolve) , (WaitForSnapshot) , (ComputeActor) , (RunTasks) , " "(Datashard.Transaction -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , " "(Datashard.Unit) , (Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " "(Tablet.Transaction.Execute -> [(Datashard.Unit)]) , (Tablet.Transaction.Wait) , (Tablet.Transaction.Enqueued) , " @@ -377,6 +377,68 @@ Y_UNIT_TEST_SUITE(TDataShardTrace) { UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString()); } + Y_UNIT_TEST(TestTraceDistributedSelectViaReadActors) { + auto [runtime, server, sender] = TestCreateServer(); + + CreateShardedTable(server, sender, "/Root", "table-1", 1, false); + + FakeWilsonUploader* uploader = new FakeWilsonUploader(); + TActorId uploaderId = runtime.Register(uploader, 0); + runtime.RegisterService(NWilson::MakeWilsonUploaderId(), uploaderId, 0); + runtime.SimulateSleep(TDuration::Seconds(10)); + + SplitTable(runtime, server, 5); + + ExecSQL( + server, + sender, + "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 100), (3, 300), (5, 500), (7, 700), (9, 900);", + true, + Ydb::StatusIds::SUCCESS + ); + + ExecSQL( + server, + sender, + "UPSERT INTO `/Root/table-1` (key, value) VALUES (2, 100), (4, 300), (6, 500), (8, 700), (10, 900);", + true, + Ydb::StatusIds::SUCCESS + ); + + NWilson::TTraceId traceId = NWilson::TTraceId::NewTraceId(15, 4095); + + ExecSQL( + server, + sender, + "SELECT * FROM `/Root/table-1`;", + true, + Ydb::StatusIds::SUCCESS, + std::move(traceId) + ); + + uploader->BuildTraceTrees(); + + UNIT_ASSERT_EQUAL(1, uploader->Traces.size()); + + FakeWilsonUploader::Trace& trace = uploader->Traces.begin()->second; + + auto readActorSpan = trace.Root.BFSFindOne("ReadActor"); + UNIT_ASSERT(readActorSpan); + + auto dsReads = readActorSpan->get().FindAll("DataShard.Read"); // Read actor sends EvRead to each shard. + UNIT_ASSERT_EQUAL(dsReads.size(), 2); + + std::string canon = "(Session.query.QUERY_ACTION_EXECUTE -> [(CompileService -> [(CompileActor)]) , " + "(DataExecuter -> [(WaitForTableResolve) , (WaitForShardsResolve) , (WaitForSnapshot) , (ComputeActor) , " + "(RunTasks) , (KqpNode.SendTasks) , (ComputeActor -> [(ReadActor -> [(WaitForShardsResolve) , " + "(DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> [(Datashard.Unit) , " + "(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> [(Tablet.WriteLog.LogEntry)])]) , " + "(ReadIterator.ReadOperation)]) , (DataShard.Read -> [(Tablet.Transaction -> [(Tablet.Transaction.Execute -> " + "[(Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit) , (Datashard.Unit)]) , (Tablet.WriteLog -> " + "[(Tablet.WriteLog.LogEntry)])]) , (ReadIterator.ReadOperation)])])])])])"; + UNIT_ASSERT_VALUES_EQUAL(canon, trace.ToString()); + } + Y_UNIT_TEST(TestTraceWriteImmediateOnShard) { auto [runtime, server, sender] = TestCreateServer(); diff --git a/ydb/library/wilson_ids/wilson.h b/ydb/library/wilson_ids/wilson.h index ba73508a4331..32cfce06998b 100644 --- a/ydb/library/wilson_ids/wilson.h +++ b/ydb/library/wilson_ids/wilson.h @@ -36,6 +36,12 @@ namespace NKikimr { ProposeTransaction = 9, ComputeActor = 9, + + ReadActor = 9, + ReadActorShardsResolve = 10, + + LookupActor = 9, + LookupActorShardsResolve = 10, }; }; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index f1fd8dddf137..cb3ebc1e687d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -217,6 +217,7 @@ struct IDqAsyncIoFactory : public TThrRefBase { IMemoryQuotaManager::TPtr MemoryQuotaManager; const google::protobuf::Message* SourceSettings = nullptr; // used only in case if we execute compute actor locally TIntrusivePtr Arena; // Arena for SourceSettings + NWilson::TTraceId TraceId; }; struct TSinkArguments { @@ -247,6 +248,7 @@ struct IDqAsyncIoFactory : public TThrRefBase { const NKikimr::NMiniKQL::THolderFactory& HolderFactory; NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder; std::shared_ptr Alloc; + NWilson::TTraceId TraceId; }; struct TOutputTransformArguments { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index afea078bf101..9d477d407240 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1593,7 +1593,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped .Alloc = TaskRunner ? TaskRunner->GetAllocatorPtr() : nullptr, .MemoryQuotaManager = MemoryLimits.MemoryQuotaManager, .SourceSettings = (!settings.empty() ? settings.at(inputIndex) : nullptr), - .Arena = Task.GetArena() + .Arena = Task.GetArena(), + .TraceId = ComputeActorSpan.GetTraceId() }); } catch (const std::exception& ex) { throw yexception() << "Failed to create source " << inputDesc.GetSource().GetType() << ": " << ex.what(); @@ -1623,7 +1624,8 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped .TypeEnv = typeEnv, .HolderFactory = holderFactory, .ProgramBuilder = *transform.ProgramBuilder, - .Alloc = TaskRunner->GetAllocatorPtr() + .Alloc = TaskRunner->GetAllocatorPtr(), + .TraceId = ComputeActorSpan.GetTraceId() }); } catch (const std::exception& ex) { throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what();