Skip to content

Commit

Permalink
Merge 6d7e7a5 into 95ce0df
Browse files Browse the repository at this point in the history
  • Loading branch information
shnikd authored Dec 16, 2024
2 parents 95ce0df + 6d7e7a5 commit f3ee3c3
Show file tree
Hide file tree
Showing 19 changed files with 237 additions and 17 deletions.
4 changes: 3 additions & 1 deletion ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
.SetUseCancelAfter(false)
.SetSyntax(syntax)
.SetSupportStreamTrailingResult(true)
.SetOutputChunkMaxSize(req->response_part_limit_bytes());
.SetOutputChunkMaxSize(req->response_part_limit_bytes())
.SetCollectFullDiagnostics(req->Getcollect_full_diagnostics());

auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(
QueryAction,
Expand Down Expand Up @@ -394,6 +395,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
hasTrailingMessage = true;
response.mutable_tx_meta()->set_id(kqpResponse.GetTxMeta().id());
}
response.set_query_full_diagnostics(kqpResponse.GetQueryDiagnostics());
}

if (hasTrailingMessage) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/grpc_services/rpc_execute_data_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
&req->parameters(),
req->collect_stats(),
req->has_query_cache_policy() ? &req->query_cache_policy() : nullptr,
req->has_operation_params() ? &req->operation_params() : nullptr);
req->has_operation_params() ? &req->operation_params() : nullptr,
NKqp::NPrivateEvents::TQueryRequestSettings().SetCollectFullDiagnostics(req->Getcollect_full_diagnostics()));

ReportCostInfo_ = req->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED;

Expand Down Expand Up @@ -203,6 +204,7 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
queryMeta.mutable_parameters_types()->insert({queryParameter.GetName(), parameterType});
}
}
queryResult->set_query_full_diagnostics(kqpResponse.GetQueryDiagnostics());
} catch (const std::exception& ex) {
NYql::TIssues issues;
issues.AddIssue(NYql::ExceptionToIssue(ex));
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,17 @@ struct TQueryRequestSettings {
return *this;
}

TQueryRequestSettings& SetCollectFullDiagnostics(bool flag) {
CollectFullDiagnostics = flag;
return *this;
}

ui64 OutputChunkMaxSize = 0;
bool KeepSession = false;
bool UseCancelAfter = true;
::Ydb::Query::Syntax Syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED;
bool SupportsStreamTrailingResult = false;
bool CollectFullDiagnostics = false;
};

struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents::EvQueryRequest> {
Expand Down Expand Up @@ -282,7 +288,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
}

bool GetCollectDiagnostics() const {
return Record.GetRequest().GetCollectDiagnostics();
return QuerySettings.CollectFullDiagnostics;
}

ui32 CalculateSerializedSize() const override {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/common/kqp_event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetIsInternalCall(RequestCtx->IsInternalCall());
Record.MutableRequest()->SetOutputChunkMaxSize(QuerySettings.OutputChunkMaxSize);

Record.MutableRequest()->SetCollectDiagnostics(QuerySettings.CollectFullDiagnostics);

RequestCtx.reset();
}
}
Expand Down
66 changes: 66 additions & 0 deletions ydb/core/kqp/ut/query/kqp_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,72 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
UNIT_ASSERT_VALUES_EQUAL(counters.RecompileRequestGet()->Val(), 1);
}

Y_UNIT_TEST(ExecuteDataQueryCollectFullDiagnostics) {
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetKqpSettings({setting});

TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

{
UNIT_ASSERT(session.ExecuteSchemeQuery(R"(
CREATE TABLE `/Root/TestTable` (
Key Uint64,
Value String,
PRIMARY KEY (Key)
);
)").GetValueSync().IsSuccess());
}

{
const TString query(Q1_(R"(
SELECT * FROM `/Root/TestTable`;
)"));

{
auto settings = TExecDataQuerySettings();
settings.CollectFullDiagnostics(true);

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString().c_str());

UNIT_ASSERT_C(!result.GetDiagnostics().empty(), "Query result diagnostics is empty");

TStringStream in;
in << result.GetDiagnostics();
NJson::TJsonValue value;
ReadJsonTree(&in, &value);

UNIT_ASSERT_C(value.IsMap(), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_id"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("version"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_text"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_parameter_types"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("table_metadata"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value["table_metadata"].IsArray(), "Incorrect Diagnostics: table_metadata type should be an array");
UNIT_ASSERT_C(value.Has("created_at"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_syntax"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_database"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_cluster"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_plan"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_type"), "Incorrect Diagnostics");
}

{
auto settings = TExecDataQuerySettings();

auto result = session.ExecuteDataQuery(query, TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString().c_str());

UNIT_ASSERT_C(result.GetDiagnostics().empty(), "Query result diagnostics should be empty, but it's not");
}
}
}

Y_UNIT_TEST(QueryCachePermissionsLoss) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
Expand Down
49 changes: 49 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,55 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
}

Y_UNIT_TEST(ExecuteCollectFullDiagnostics) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();

{
TExecuteQuerySettings settings;
settings.CollectFullDiagnostics(true);

auto result = db.ExecuteQuery(R"(
SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0;
)", TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_C(!result.GetDiagnostics().empty(), "Query result diagnostics is empty");

TStringStream in;
in << result.GetDiagnostics();
NJson::TJsonValue value;
ReadJsonTree(&in, &value);

UNIT_ASSERT_C(value.IsMap(), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_id"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("version"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_text"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_parameter_types"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("table_metadata"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value["table_metadata"].IsArray(), "Incorrect Diagnostics: table_metadata type should be an array");
UNIT_ASSERT_C(value.Has("created_at"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_syntax"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_database"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_cluster"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_plan"), "Incorrect Diagnostics");
UNIT_ASSERT_C(value.Has("query_type"), "Incorrect Diagnostics");
}

{
TExecuteQuerySettings settings;
settings.CollectFullDiagnostics(true);

auto result = db.ExecuteQuery(R"(
SELECT Key, Value2 FROM TwoShard WHERE Value2 > 0;
)", TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString().c_str());

UNIT_ASSERT_C(result.GetDiagnostics().empty(), "Query result diagnostics should be empty, but it's not");
}
}

void CheckQueryResult(TExecuteQueryResult result) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
Expand Down
5 changes: 5 additions & 0 deletions ydb/public/api/protos/ydb_query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ message ExecuteQueryRequest {
int64 response_part_limit_bytes = 9 [(Ydb.value) = "[0; 33554432]"];

string pool_id = 10; // Workload manager pool id

bool collect_full_diagnostics = 11;
}

message ResultSetMeta {
Expand All @@ -191,6 +193,9 @@ message ExecuteQueryResponsePart {
Ydb.TableStats.QueryStats exec_stats = 5;

TransactionMeta tx_meta = 6;

// Full query diagnostics
string query_full_diagnostics = 7;
}

message ExecuteScriptRequest {
Expand Down
3 changes: 3 additions & 0 deletions ydb/public/api/protos/ydb_table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,7 @@ message ExecuteDataQueryRequest {
QueryCachePolicy query_cache_policy = 5;
Ydb.Operations.OperationParams operation_params = 6;
QueryStatsCollection.Mode collect_stats = 7;
bool collect_full_diagnostics = 8;
}

message ExecuteDataQueryResponse {
Expand Down Expand Up @@ -984,6 +985,8 @@ message ExecuteQueryResult {
QueryMeta query_meta = 3;
// Query execution statistics
Ydb.TableStats.QueryStats query_stats = 4;
// Full query diagnostics
string query_full_diagnostics = 5;
}

// Explain data query
Expand Down
29 changes: 27 additions & 2 deletions ydb/public/lib/ydb_cli/commands/ydb_service_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ void TCommandExecuteQuery::Config(TConfig& config) {
config.Opts->AddLongOption('q', "query", "Text of query to execute").RequiredArgument("[String]").StoreResult(&Query);
config.Opts->AddLongOption('f', "file", "Path to file with query text to execute")
.RequiredArgument("PATH").StoreResult(&QueryFile);
config.Opts->AddLongOption("collect-diagnostics", "Collects diagnostics and saves it to file")
.StoreTrue(&CollectFullDiagnostics);

AddOutputFormats(config, {
EDataFormat::Pretty,
Expand Down Expand Up @@ -432,6 +434,9 @@ int TCommandExecuteQuery::ExecuteDataQuery(TConfig& config) {
NTable::TExecDataQuerySettings settings;
settings.KeepInQueryCache(true);
settings.CollectQueryStats(ParseQueryStatsModeOrThrow(CollectStatsMode, defaultStatsMode));
if (CollectFullDiagnostics) {
settings.CollectFullDiagnostics(true);
}

NTable::TTxSettings txSettings;
if (TxMode) {
Expand Down Expand Up @@ -516,6 +521,11 @@ void TCommandExecuteQuery::PrintDataQueryResponse(NTable::TDataQueryResult& resu
{
Cout << Endl << "Flame graph is available for full or profile stats only" << Endl;
}
if (CollectFullDiagnostics)
{
TFileOutput file(TStringBuilder() << "diagnostics_" << TGUID::Create().AsGuidString() << ".txt");
file << result.GetDiagnostics();
}
}

int TCommandExecuteQuery::ExecuteSchemeQuery(TConfig& config) {
Expand Down Expand Up @@ -548,7 +558,7 @@ namespace {
NQuery::TExecuteQuerySettings>;

template <typename TClient>
auto GetSettings(const TString& collectStatsMode, const bool basicStats, std::optional<TDuration> timeout) {
auto GetSettings(const TString& collectStatsMode, const bool basicStats, std::optional<TDuration> timeout, bool collectFullDiagnostics) {
if constexpr (std::is_same_v<TClient, NTable::TTableClient>) {
const auto defaultStatsMode = basicStats
? NTable::ECollectQueryStatsMode::Basic
Expand All @@ -558,6 +568,9 @@ namespace {
if (timeout.has_value()) {
settings.ClientTimeout(*timeout);
}
if (collectFullDiagnostics) {
settings.CollectFullDiagnostics(true);
}
return settings;
} else if constexpr (std::is_same_v<TClient, NQuery::TQueryClient>) {
const auto defaultStatsMode = basicStats
Expand All @@ -568,6 +581,9 @@ namespace {
if (timeout.has_value()) {
settings.ClientTimeout(*timeout);
}
if (collectFullDiagnostics) {
settings.CollectFullDiagnostics(true);
}
return settings;
}
Y_UNREACHABLE();
Expand Down Expand Up @@ -674,7 +690,7 @@ int TCommandExecuteQuery::ExecuteQueryImpl(TConfig& config) {
if (OperationTimeout) {
optTimeout = TDuration::MilliSeconds(FromString<ui64>(OperationTimeout));
}
const auto settings = GetSettings<TClient>(CollectStatsMode, BasicStats, optTimeout);
const auto settings = GetSettings<TClient>(CollectStatsMode, BasicStats, optTimeout, CollectFullDiagnostics);

TAsyncPartIterator<TClient> asyncResult;
SetInterruptHandlers();
Expand Down Expand Up @@ -732,6 +748,7 @@ template <typename TIterator>
bool TCommandExecuteQuery::PrintQueryResponse(TIterator& result) {
TMaybe<TString> stats;
TMaybe<TString> fullStats;
TString diagnostics;
{
TResultSetPrinter printer(OutputFormat, &IsInterrupted);

Expand All @@ -753,6 +770,8 @@ bool TCommandExecuteQuery::PrintQueryResponse(TIterator& result) {
fullStats = queryStats.GetPlan();
}
}

diagnostics = streamPart.GetDiagnostics();
}
} // TResultSetPrinter destructor should be called before printing stats

Expand All @@ -767,6 +786,12 @@ bool TCommandExecuteQuery::PrintQueryResponse(TIterator& result) {
queryPlanPrinter.Print(*fullStats);
}

if (CollectFullDiagnostics)
{
TFileOutput file(TStringBuilder() << "diagnostics_" << TGUID::Create().AsGuidString() << ".txt");
file << diagnostics;
}

PrintFlameGraph(fullStats);

if (IsInterrupted()) {
Expand Down
1 change: 1 addition & 0 deletions ydb/public/lib/ydb_cli/commands/ydb_service_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class TCommandExecuteQuery : public TTableCommand, TCommandQueryBase, TCommandWi
TString TxMode;
TString QueryType;
bool BasicStats = false;
bool CollectFullDiagnostics = false;
};

class TCommandExplain : public TTableCommand, public TCommandWithOutput, TCommandQueryBase, TInterruptibleCommand {
Expand Down
Loading

0 comments on commit f3ee3c3

Please sign in to comment.