Skip to content

Commit

Permalink
Merge 23edc19 into 96dfa75
Browse files Browse the repository at this point in the history
  • Loading branch information
jepett0 authored Feb 15, 2024
2 parents 96dfa75 + 23edc19 commit f57a49b
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 23 deletions.
22 changes: 14 additions & 8 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,10 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
AsyncCompileResult->Continue().Apply(callback);
}

void AddMessageToReplayLog(const TString& queryPlan) {
void AddMessageToReplayLog(const TString& queryPlan, const TVector<NKikimrKqp::TKqpTableMetadataProto>& collectedSchemeData) {
NJson::TJsonValue replayMessage(NJson::JSON_MAP);

NJson::TJsonValue tablesMeta(NJson::JSON_ARRAY);
auto collectedSchemeData = Gateway->GetCollectedSchemeData();
for (auto proto: collectedSchemeData) {
tablesMeta.AppendValue(Base64Encode(proto.SerializeAsString()));
}
Expand Down Expand Up @@ -372,10 +371,14 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
PassAway();
}

void FillCompileResult(std::unique_ptr<NKikimrKqp::TPreparedQuery> preparingQuery, NKikimrKqp::EQueryType queryType) {
void FillCompileResult(std::unique_ptr<NKikimrKqp::TPreparedQuery> preparingQuery,
NKikimrKqp::EQueryType queryType,
const TVector<NKikimrKqp::TKqpTableMetadataProto>& collectedViewsMetadata = {}
) {
auto preparedQueryHolder = std::make_shared<TPreparedQueryHolder>(
preparingQuery.release(), AppData()->FunctionRegistry);
preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType);
preparedQueryHolder->FillViews(collectedViewsMetadata);
KqpCompileResult->PreparedQuery = preparedQueryHolder;
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery());

Expand Down Expand Up @@ -403,10 +406,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
Counters->ReportSqlVersion(DbCounters, *kqpResult.SqlVersion);
}

if (status == Ydb::StatusIds::SUCCESS) {
AddMessageToReplayLog(kqpResult.QueryPlan);
}

ETableReadType maxReadType = ExtractMostHeavyReadType(kqpResult.QueryPlan);

auto queryType = QueryId.Settings.QueryType;
Expand All @@ -415,7 +414,14 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {

if (status == Ydb::StatusIds::SUCCESS) {
YQL_ENSURE(kqpResult.PreparingQuery);
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType);

auto collectedSchemeData = Gateway->GetCollectedSchemeData();
AddMessageToReplayLog(kqpResult.QueryPlan, collectedSchemeData);
EraseIf(collectedSchemeData, [](const NKikimrKqp::TKqpTableMetadataProto& metadata) {
return !metadata.HasKind() || static_cast<EKikimrTableKind>(metadata.GetKind()) != EKikimrTableKind::View;
});

FillCompileResult(std::move(kqpResult.PreparingQuery), queryType, std::move(collectedSchemeData));

auto now = TInstant::Now();
auto duration = now - StartTime;
Expand Down
16 changes: 16 additions & 0 deletions ydb/core/kqp/query_data/kqp_prepared_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,22 @@ void TPreparedQueryHolder::FillTables(const google::protobuf::RepeatedPtrField<
}
}
}

void TPreparedQueryHolder::FillViews(const TVector<NKikimrKqp::TKqpTableMetadataProto>& viewsMetadata) {
for (const auto& view : viewsMetadata) {
const auto& pathId = view.GetPathId();
const auto schemaVersion = view.GetSchemaVersion();
const auto& tableName = view.GetName();

NKqpProto::TKqpTableInfo tableInfo;
tableInfo.MutableTableId()->SetOwnerId(pathId.GetOwnerId());
tableInfo.MutableTableId()->SetTableId(pathId.GetTableId());
tableInfo.SetSchemaVersion(schemaVersion);
tableInfo.SetTableName(tableName);

QueryViews.emplace_back(std::move(tableInfo));
}
}

bool TPreparedQueryHolder::HasTempTables(TKqpTempTablesState::TConstPtr tempTablesState, bool withSessionId) const {
if (!tempTablesState) {
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/kqp/query_data/kqp_prepared_query.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class TPreparedQueryHolder {
std::shared_ptr<const NKikimrKqp::TPreparedQuery> Proto;
std::shared_ptr<TPreparedQueryAllocHolder> Alloc;
TVector<TString> QueryTables;
TVector<NKqpProto::TKqpTableInfo> QueryViews;
std::vector<TKqpPhyTxHolder::TConstPtr> Transactions;
TIntrusivePtr<TTableConstInfoMap> TableConstInfoById;

Expand Down Expand Up @@ -180,6 +181,10 @@ class TPreparedQueryHolder {
return QueryTables;
}

const TVector<NKqpProto::TKqpTableInfo>& GetQueryViews() const {
return QueryViews;
}

const NKqpProto::TKqpPhyQuery& GetPhysicalQuery() const {
return Proto->GetPhysicalQuery();
}
Expand All @@ -192,6 +197,8 @@ class TPreparedQueryHolder {

void FillTables(const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyStage>& stages);

void FillViews(const TVector<NKikimrKqp::TKqpTableMetadataProto>& viewsMetadata);

bool HasTempTables(TKqpTempTablesState::TConstPtr tempTablesState, bool withSessionId) const;
};

Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySet> TKqpQueryState::BuildN
return std::make_unique<TEvTxProxySchemeCache::TEvNavigateKeySet>(navigate.Release());
}


bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) {
CompileResult = ev->CompileResult;
YQL_ENSURE(CompileResult);
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,16 @@ class TKqpQueryState : public TNonCopyable {
TableVersions.emplace(tableId, table.GetVersion());
}
};
auto addView = [&](const NKqpProto::TKqpTableInfo& view) {
NKikimr::TTableId tableId(view.GetTableId().GetOwnerId(), view.GetTableId().GetTableId());
auto it = TableVersions.find(tableId);
if (it != TableVersions.end()) {
Y_ENSURE(it->second == view.GetSchemaVersion());
} else {
TableVersions.emplace(tableId, view.GetSchemaVersion());
}
};

for (const auto& stage : phyTx.GetStages()) {
for (const auto& tableOp : stage.GetTableOps()) {
addTable(tableOp.GetTable());
Expand Down Expand Up @@ -232,6 +242,9 @@ class TKqpQueryState : public TNonCopyable {
addTable(table.GetId());
}
}
for (const auto& view : PreparedQuery->GetQueryViews()) {
addView(view);
}
}

bool NeedCheckTableVersions() const {
Expand Down
95 changes: 87 additions & 8 deletions ydb/core/kqp/ut/view/view_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>

#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/library/yql/sql/sql.h>
#include <ydb/library/yql/utils/log/log.h>
Expand All @@ -8,6 +10,7 @@

using namespace NKikimr;
using namespace NKikimr::NKqp;
using namespace NYdb;
using namespace NYdb::NTable;

namespace {
Expand Down Expand Up @@ -75,8 +78,37 @@ TDataQueryResult ExecuteDataModificationQuery(TSession& session,
return result;
}

TString GetYsonResults(TSession& session, const TString& query, const TExecDataQuerySettings& settings = {}) {
return FormatResultSetYson(ExecuteDataModificationQuery(session, query, settings).GetResultSet(0));
TValue GetSingleResult(const TDataQueryResult& rawResults) {
auto resultSetParser = rawResults.GetResultSetParser(0);
UNIT_ASSERT(resultSetParser.TryNextRow());
return resultSetParser.GetValue(0);
}

TValue GetSingleResult(TSession& session, const TString& query, const TExecDataQuerySettings& settings = {}) {
return GetSingleResult(ExecuteDataModificationQuery(session, query, settings));
}

TInstant GetTimestamp(const TValue& value) {
return TValueParser(value).GetTimestamp();
}

int GetInteger(const TValue& value) {
return TValueParser(value).GetInt32();
}

TMaybe<bool> GetFromCache(const TQueryStats& stats) {
const auto& proto = NYdb::TProtoAccessor::GetProto(stats);
if (!proto.Hascompilation()) {
return Nothing();
}
return proto.Getcompilation().Getfrom_cache();
}

void AssertFromCache(const TMaybe<TQueryStats>& stats, bool expectedValue) {
UNIT_ASSERT(stats.Defined());
const auto fromCache = GetFromCache(*stats);
UNIT_ASSERT_C(fromCache.Defined(), stats->ToString());
UNIT_ASSERT_VALUES_EQUAL_C(*fromCache, expectedValue, stats->ToString());
}

void CompareResults(const TDataQueryResult& first, const TDataQueryResult& second) {
Expand Down Expand Up @@ -384,6 +416,53 @@ Y_UNIT_TEST_SUITE(TSelectFromViewTest) {
ExecuteDataDefinitionQuery(session, ReadWholeFile(pathPrefix + "drop_view.sql"));
}
}

Y_UNIT_TEST(QueryCacheIsUpdated) {
TKikimrRunner kikimr(TKikimrSettings().SetWithSampleTables(false));
EnableViewsFeatureFlag(kikimr);
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();

constexpr const char* viewName = "TheView";

const auto getCreationQuery = [&viewName](const char* innerQuery) -> TString {
return std::format(R"(
CREATE VIEW {} WITH (security_invoker = TRUE) AS {};
)",
viewName,
innerQuery
);
};
constexpr const char* firstInnerQuery = "SELECT 1";
ExecuteDataDefinitionQuery(session, getCreationQuery(firstInnerQuery));

const TString selectFromViewQuery = std::format(R"(
SELECT * FROM {};
)",
viewName
);
TExecDataQuerySettings queryExecutionSettings;
queryExecutionSettings.KeepInQueryCache(true);
queryExecutionSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
ExecuteDataModificationQuery(session, selectFromViewQuery, queryExecutionSettings);
// make sure the server side cache is working by calling the same query twice
const auto cachedQueryRawResult = ExecuteDataModificationQuery(session, selectFromViewQuery, queryExecutionSettings);
AssertFromCache(cachedQueryRawResult.GetStats(), true);
UNIT_ASSERT_VALUES_EQUAL(GetInteger(GetSingleResult(cachedQueryRawResult)), 1);

// recreate the view with a different query inside
ExecuteDataDefinitionQuery(session, std::format(R"(
DROP VIEW {};
)",
viewName
)
);
constexpr const char* secondInnerQuery = "SELECT 2";
ExecuteDataDefinitionQuery(session, getCreationQuery(secondInnerQuery));

const auto secondCallRawResult = ExecuteDataModificationQuery(session, selectFromViewQuery, queryExecutionSettings);
AssertFromCache(secondCallRawResult.GetStats(), false);
UNIT_ASSERT_VALUES_EQUAL(GetInteger(GetSingleResult(secondCallRawResult)), 2);
}
}

Y_UNIT_TEST_SUITE(TEvaluateExprInViewTest) {
Expand Down Expand Up @@ -414,9 +493,9 @@ Y_UNIT_TEST_SUITE(TEvaluateExprInViewTest) {
TExecDataQuerySettings queryExecutionSettings;
queryExecutionSettings.KeepInQueryCache(true);
const auto executeTwice = [&](const TString& query) {
return TVector<TString>{
GetYsonResults(session, query, queryExecutionSettings),
GetYsonResults(session, query, queryExecutionSettings)
return TVector<TInstant>{
GetTimestamp(GetSingleResult(session, query, queryExecutionSettings)),
GetTimestamp(GetSingleResult(session, query, queryExecutionSettings))
};
};
const auto viewResults = executeTwice(selectFromViewQuery);
Expand Down Expand Up @@ -455,9 +534,9 @@ Y_UNIT_TEST_SUITE(TEvaluateExprInViewTest) {
TExecDataQuerySettings queryExecutionSettings;
queryExecutionSettings.KeepInQueryCache(true);
const auto executeTwice = [&](const TString& query) {
return TVector<TString>{
GetYsonResults(session, query, queryExecutionSettings),
GetYsonResults(session, query, queryExecutionSettings)
return TVector<TInstant>{
GetTimestamp(GetSingleResult(session, query, queryExecutionSettings)),
GetTimestamp(GetSingleResult(session, query, queryExecutionSettings))
};
};
const auto viewResults = executeTwice(selectFromViewQuery);
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/protos/kqp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,17 @@ message TQueryRequest {
}

message TKqpPathIdProto {
optional uint32 OwnerId = 1;
optional uint32 TableId = 2;
optional uint64 OwnerId = 1;
optional uint64 TableId = 2;
}

message TIndexDescriptionProto {
optional string Name = 1;
optional uint32 Type = 2;
optional uint32 State = 3;
optional uint32 SchemaVersion = 4;
optional uint32 LocalPathId = 5;
optional uint32 PathOwnerId = 6;
optional uint64 SchemaVersion = 4;
optional uint64 LocalPathId = 5;
optional uint64 PathOwnerId = 6;
repeated string KeyColumns = 7;
repeated string DataColumns = 8;
};
Expand Down Expand Up @@ -160,7 +160,7 @@ message TKqpTableMetadataProto {
optional string Name = 3;
optional string SysView = 4;
optional TKqpPathIdProto PathId = 5;
optional uint32 SchemaVersion = 6;
optional uint64 SchemaVersion = 6;
optional uint32 Kind = 7;
repeated TAttributeProto Attributes = 8;
repeated TKqpColumnMetadataProto Columns = 9;
Expand Down

0 comments on commit f57a49b

Please sign in to comment.