Skip to content

Commit

Permalink
Merge 7aff430 into 40620ff
Browse files Browse the repository at this point in the history
  • Loading branch information
Hor911 authored Oct 24, 2024
2 parents 40620ff + 7aff430 commit 581bc8d
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 34 deletions.
48 changes: 25 additions & 23 deletions ydb/core/fq/libs/compute/common/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <library/cpp/json/json_writer.h>
#include <library/cpp/json/yson/json2yson.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/library/yql/public/issue/protos/issue_severity.pb.h>

namespace NFq {

Expand Down Expand Up @@ -431,7 +432,7 @@ void EnumeratePlans(NYson::TYsonWriter& writer, NJson::TJsonValue& value, ui32&
}
}

TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage, TString* timeline, ui64 maxTimelineSize) {
TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage, TString* timeline) {
TStringStream out;
NYson::TYsonWriter writer(&out);
writer.OnBeginMap();
Expand Down Expand Up @@ -475,20 +476,7 @@ TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage, TString* time
if (timeline) {
TPlanVisualizer planViz;
planViz.LoadPlans(plan);
*timeline = planViz.PrintSvgSafe();
if (maxTimelineSize && timeline->size() > maxTimelineSize) {
TStringBuilder builder;
builder
<< "<svg width='600' height='200' xmlns='http://www.w3.org/2000/svg'>" << Endl
<< " <text font-size='16px' x='20' y='40'>There is nothing wrong with the request.</text>" << Endl
<< " <text font-size='16px' x='20' y='80'>Unfortunately, image size " << timeline->size() << " is too large.</text>" << Endl
<< " <text font-size='16px' x='20' y='120'>It exceeds limit of " << maxTimelineSize << " and was discarded</text>" << Endl
<< "</svg>" << Endl;
*timeline = builder;
}
// remove json "timeline" field after migration
writer.OnKeyedItem("timeline");
writer.OnStringScalar(*timeline);
*timeline = planViz.PrintSvg();
}
writer.OnEndMap();
return NJson2Yson::ConvertYson2Json(out.Str());
Expand Down Expand Up @@ -1164,7 +1152,7 @@ struct TNoneStatProcessor : IPlanStatProcessor {
return plan;
}

TString GetQueryStat(const TString&, double& cpuUsage, TString*, ui64) override {
TString GetQueryStat(const TString&, double& cpuUsage, TString*) override {
cpuUsage = 0.0;
return "";
}
Expand Down Expand Up @@ -1197,8 +1185,8 @@ struct TPlanStatProcessor : IPlanStatProcessor {
return plan;
}

TString GetQueryStat(const TString& plan, double& cpuUsage, TString* timeline, ui64 maxtimelineSize) override {
return GetV1StatFromV2Plan(plan, &cpuUsage, timeline, maxtimelineSize);
TString GetQueryStat(const TString& plan, double& cpuUsage, TString* timeline) override {
return GetV1StatFromV2Plan(plan, &cpuUsage, timeline);
}

TPublicStat GetPublicStat(const TString& stat) override {
Expand Down Expand Up @@ -1229,8 +1217,8 @@ struct TProfileStatProcessor : TPlanStatProcessor {
};

struct TProdStatProcessor : TFullStatProcessor {
TString GetQueryStat(const TString& plan, double& cpuUsage, TString* timeline, ui64 maxtimelineSize) override {
return GetPrettyStatistics(GetV1StatFromV2Plan(plan, &cpuUsage, timeline, maxtimelineSize));
TString GetQueryStat(const TString& plan, double& cpuUsage, TString* timeline) override {
return GetPrettyStatistics(GetV1StatFromV2Plan(plan, &cpuUsage, timeline));
}
};

Expand Down Expand Up @@ -1263,10 +1251,18 @@ Fq::Private::PingTaskRequest PingTaskRequestBuilder::Build(
) {
Fq::Private::PingTaskRequest pingTaskRequest = Build(queryStats);

// Application-level issues, pass as is
if (issues) {
NYql::IssuesToMessage(issues, pingTaskRequest.mutable_issues());
}

// Builder own (internal) issues will be logged later, just warn the user
if (Issues) {
auto* issue = pingTaskRequest.add_issues();
issue->set_message("There are minor issues with query statistics processing. You can supply query ID and ask support for the information.");
issue->set_severity(NYql::TSeverityIds::S_WARNING);
}

if (computeStatus) {
pingTaskRequest.set_status(*computeStatus);
}
Expand Down Expand Up @@ -1318,7 +1314,13 @@ Fq::Private::PingTaskRequest PingTaskRequestBuilder::Build(const TString& queryP
CpuUsage = 0.0;
try {
TString timeline;
auto stat = Processor->GetQueryStat(plan, CpuUsage, ShowQueryTimeline ? &timeline : nullptr, MaxQueryTimelineSize);
auto stat = Processor->GetQueryStat(plan, CpuUsage, ShowQueryTimeline ? &timeline : nullptr);

if (MaxQueryTimelineSize && timeline.size() > MaxQueryTimelineSize) {
Issues.AddIssue(NYql::TIssue(TStringBuilder() << "Timeline size " << timeline.size() << " exceeds limit of " << MaxQueryTimelineSize));
timeline = "";
}

pingTaskRequest.set_statistics(stat);
pingTaskRequest.set_dump_raw_statistics(true);
if (timeline) {
Expand All @@ -1329,8 +1331,8 @@ Fq::Private::PingTaskRequest PingTaskRequestBuilder::Build(const TString& queryP
flatStat["ComputeTimeUs"] = computeTimeUs;
SerializeStats(*pingTaskRequest.mutable_flat_stats(), flatStat);
PublicStat = Processor->GetPublicStat(stat);
} catch(const NJson::TJsonException& ex) {
Issues.AddIssue(NYql::TIssue(TStringBuilder() << "Error stat conversion: " << ex.what()));
} catch (const std::exception& e) {
Issues.AddIssue(NYql::TIssue(TStringBuilder() << "Error stat processing: " << e.what()));
}

return pingTaskRequest;
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/fq/libs/compute/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ inline std::shared_ptr<NYdb::NTable::TTableClient> CreateNewTableClient(const TS
tableSettings);
}

TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage = nullptr, TString* timeline = nullptr, ui64 maxTimelineSize = 0);
TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage = nullptr, TString* timeline = nullptr);
TString GetV1StatFromV2PlanV2(const TString& plan);
TString GetPrettyStatistics(const TString& statistics);
THashMap<TString, i64> AggregateStats(TStringBuf plan);
Expand All @@ -55,7 +55,7 @@ struct IPlanStatProcessor {
virtual Ydb::Query::StatsMode GetStatsMode() = 0;
virtual TString ConvertPlan(const TString& plan) = 0;
virtual TString GetPlanVisualization(const TString& plan) = 0;
virtual TString GetQueryStat(const TString& plan, double& cpuUsage, TString* timeline, ui64 maxtimelineSize) = 0;
virtual TString GetQueryStat(const TString& plan, double& cpuUsage, TString* timeline) = 0;
virtual TPublicStat GetPublicStat(const TString& stat) = 0;
virtual THashMap<TString, i64> GetFlatStat(TStringBuf plan) = 0;
};
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/compute/common/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ PEERDIR(
ydb/core/fq/libs/grpc
ydb/core/fq/libs/shared_resources
ydb/library/yql/public/issue
ydb/library/yql/public/issue/protos
ydb/library/yql/providers/common/http_gateway
ydb/library/yql/providers/dq/provider
ydb/library/yql/providers/generic/connector/api/service/protos
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
Fq::Private::PingTaskRequest pingTaskRequest = Builder.Build(QueryStats, Issues);
if (Builder.Issues) {
LOG_W(Builder.Issues.ToOneLineString());
GetStepCountersSubgroup()->GetCounter("StatIssues", true)->Inc();
}
ReportPublicCounters(Builder.PublicStat);
Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest), 0, 1);
Expand All @@ -248,6 +249,7 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
Fq::Private::PingTaskRequest pingTaskRequest = Builder.Build(QueryStats, Issues, std::nullopt, StatusCode);
if (Builder.Issues) {
LOG_W(Builder.Issues.ToOneLineString());
GetStepCountersSubgroup()->GetCounter("StatIssues", true)->Inc();
}
ReportPublicCounters(Builder.PublicStat);
UpdateCpuQuota(Builder.CpuUsage);
Expand All @@ -263,6 +265,7 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
Fq::Private::PingTaskRequest pingTaskRequest = Builder.Build(QueryStats, Issues, ComputeStatus, std::nullopt);
if (Builder.Issues) {
LOG_W(Builder.Issues.ToOneLineString());
GetStepCountersSubgroup()->GetCounter("StatIssues", true)->Inc();
}
ReportPublicCounters(Builder.PublicStat);
UpdateCpuQuota(Builder.CpuUsage);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/compute/ydb/stopper_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class TStopperActor : public TBaseComputeActor<TStopperActor> {
Fq::Private::PingTaskRequest pingTaskRequest = Builder.Build(response.QueryStats, response.Issues, FederatedQuery::QueryMeta::ABORTING_BY_USER, statusCode);
if (Builder.Issues) {
LOG_W(Builder.Issues.ToOneLineString());
GetStepCountersSubgroup()->GetCounter("StatIssues", true)->Inc();
}
Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
}
Expand Down
146 changes: 138 additions & 8 deletions ydb/public/lib/ydb_cli/common/plan2svg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ void TPlan::LoadStage(std::shared_ptr<TStage> stage, const NJson::TJsonValue& no

TStringBuilder builder;

if (name == "Iterator" || name == "Member") {
if (name == "Iterator" || name == "Member" || name == "ToFlow") {
builder << "Reference";
} else {
builder << name;
Expand All @@ -461,11 +461,25 @@ void TPlan::LoadStage(std::shared_ptr<TStage> stage, const NJson::TJsonValue& no
if (auto* limitNode = subNode.GetValueByPath("Limit")) {
builder << ": " << limitNode->GetStringSafe();
}
} else if (name == "Sort") {
if (auto* sortByNode = subNode.GetValueByPath("SortBy")) {
auto sortBy = sortByNode->GetStringSafe();
while (true) {
auto p = sortBy.find("row.");
if (p == sortBy.npos) {
break;
}
sortBy.erase(p, 4);
}
if (sortBy) {
builder << " by " << sortBy;
}
}
} else if (name == "Filter") {
if (auto* predicateNode = subNode.GetValueByPath("Predicate")) {
auto filter = predicateNode->GetStringSafe();
prevFilter = filter;
while(true) {
while (true) {
auto p = filter.find("item.");
if (p == filter.npos) {
break;
Expand All @@ -482,14 +496,110 @@ void TPlan::LoadStage(std::shared_ptr<TStage> stage, const NJson::TJsonValue& no
}
builder << ": " << filter;
}
} else if (name == "TopSort") {
} else if (name == "Aggregate") {
if (auto* aggregationNode = subNode.GetValueByPath("Aggregation")) {
auto aggr = aggregationNode->GetStringSafe();
if (aggr) {
if (aggr.StartsWith("{")) {
aggr.erase(aggr.begin());
}
if (aggr.EndsWith("}")) {
aggr.erase(aggr.end() - 1);
}
while (true) {
auto p = aggr.find("_yql_agg_");
if (p == aggr.npos) {
break;
}
auto l = 9;
auto p1 = aggr.begin() + p + l;
while (p1 != aggr.end() && *p1 >= '0' && *p1 <= '9') {
p1++;
l++;
}
auto yqlAgg = aggr.substr(p, l);
if (p1 != aggr.end() && *p1 == ':') {
p1++;
l++;
if (p1 != aggr.end() && *p1 == ' ') {
p1++;
l++;
}
}
aggr.erase(p, l);

auto extraChars = 7;
p = aggr.find(",state." + yqlAgg);
if (p == aggr.npos) {
p = aggr.find("state." + yqlAgg + ",");
}
if (p == aggr.npos) {
p = aggr.find("state." + yqlAgg);
extraChars = 6;
}
if (p != aggr.npos) {
aggr.erase(p, yqlAgg.size() + extraChars);
}
}
while (true) {
auto p = aggr.find("item.");
if (p == aggr.npos) {
break;
}
aggr.erase(p, 5);
}
builder << " " << aggr;
}
}
if (auto* groupByNode = subNode.GetValueByPath("GroupBy")) {
auto groupBy = groupByNode->GetStringSafe();
while (true) {
auto p = groupBy.find("item.");
if (p == groupBy.npos) {
break;
}
groupBy.erase(p, 5);
}
if (groupBy) {
builder << ", Group By: " << groupBy;
}
}
} else if (name == "TableFullScan") {
if (auto* tableNode = subNode.GetValueByPath("Table")) {
auto table = tableNode->GetStringSafe();
auto n = table.find_last_of('/');
if (n != table.npos) {
table = table.substr(n + 1);
}
builder << " " << table;
}
builder << "(";
if (auto* readColumnsNode = subNode.GetValueByPath("ReadColumns")) {
bool firstColumn = true;
for (const auto& subNode : readColumnsNode->GetArray()) {
if (firstColumn) {
firstColumn = false;
} else {
builder << ", ";
}
builder << subNode.GetStringSafe();
}
}
builder << ")";
} else if (name == "TopSort" || name == "Top") {
if (auto* limitNode = subNode.GetValueByPath("Limit")) {
builder << ", Limit: " << limitNode->GetStringSafe();
auto limit = limitNode->GetStringSafe();
if (limit) {
builder << ", Limit: " << limit;
}
}
if (auto* topSortByNode = subNode.GetValueByPath("TopSortBy")) {
builder << ", TopSortBy: " << topSortByNode->GetStringSafe();
auto topSortBy = topSortByNode->GetStringSafe();
if (topSortBy) {
builder << ", TopSortBy: " << topSortBy;
}
}
} else if (name == "Iterator" || name == "Member") {
} else if (name == "Iterator" || name == "Member" || name == "ToFlow") {
if (auto* referenceNode = subNode.GetValueByPath(name)) {
auto referenceName = referenceNode->GetStringSafe();
references.insert(referenceName);
Expand Down Expand Up @@ -640,6 +750,7 @@ void TPlan::LoadStage(std::shared_ptr<TStage> stage, const NJson::TJsonValue& no
}
if (planNodeType == "Connection") {
auto* keyColumnsNode = plan.GetValueByPath("KeyColumns");
auto* sortColumnsNode = plan.GetValueByPath("SortColumns");
if (auto* subNode = plan.GetValueByPath("Plans")) {
for (auto& plan : subNode->GetArray()) {
TString nodeType;
Expand All @@ -659,6 +770,11 @@ void TPlan::LoadStage(std::shared_ptr<TStage> stage, const NJson::TJsonValue& no
stage->Connections.back()->KeyColumns.push_back(keyColumn.GetStringSafe());
}
}
if (sortColumnsNode) {
for (auto& sortColumn : sortColumnsNode->GetArray()) {
stage->Connections.back()->SortColumns.push_back(sortColumn.GetStringSafe());
}
}

if (auto* planNodeIdNode = plan.GetValueByPath("PlanNodeId")) {
auto planNodeId = planNodeIdNode->GetStringRobust();
Expand Down Expand Up @@ -758,8 +874,9 @@ void TPlan::LoadSource(std::shared_ptr<TSource> source, const NJson::TJsonValue&
builder << " " << sourceTypeNode->GetStringSafe();
}
if (auto* nameNode = subNode.GetValueByPath("Name")) {
builder << " " << nameNode->GetStringSafe() << "(";
builder << " " << nameNode->GetStringSafe();
}
builder << "(";
if (auto* readColumnsNode = subNode.GetValueByPath("ReadColumns")) {
bool firstColumn = true;
for (const auto& subNode : readColumnsNode->GetArray()) {
Expand Down Expand Up @@ -1038,8 +1155,9 @@ void TPlan::PrintSvg(ui64 maxTime, ui32& offsetY, TStringBuilder& background, TS
if (!s->Info.empty()) {
for (auto text : s->Info) {
canvas
<< "<g><title>" << text << "</title>"
<< "<text clip-path='url(#clipTextPath)' font-family='Verdana' font-size='" << INTERNAL_TEXT_HEIGHT << "px' fill='" << Config.Palette.StageText << "' x='" << s->IndentX + INTERNAL_WIDTH + 2
<< "' y='" << y0 << "'>" << text << "</text>" << Endl;
<< "' y='" << y0 << "'>" << text << "</text>" << "</g>" << Endl;
y0 += (INTERNAL_TEXT_HEIGHT + INTERNAL_GAP_Y);
}
} else {
Expand Down Expand Up @@ -1295,6 +1413,18 @@ void TPlan::PrintSvg(ui64 maxTime, ui32& offsetY, TStringBuilder& background, TS
canvas << k;
}
}
if (!c->SortColumns.empty()) {
canvas << " SortColumns: ";
bool first = true;
for (auto s : c->SortColumns) {
if (first) {
first = false;
} else {
canvas << ", ";
}
canvas << s;
}
}
canvas
<< "</title>" << Endl
<< " <polygon points='" << x + INTERNAL_WIDTH << "," << y + INTERNAL_HEIGHT << " "
Expand Down
1 change: 1 addition & 0 deletions ydb/public/lib/ydb_cli/common/plan2svg.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class TConnection {
std::shared_ptr<TSingleMetric> InputBytes;
std::shared_ptr<TSingleMetric> InputRows;
std::vector<std::string> KeyColumns;
std::vector<std::string> SortColumns;
bool CteConnection = false;
ui32 CteIndentX = 0;
ui32 CteOffsetY = 0;
Expand Down
Loading

0 comments on commit 581bc8d

Please sign in to comment.