@@ -28,240 +28,6 @@ using namespace NKikimrConfig;
2828using namespace NYql ;
2929
3030
31- class TKqpQueryCache {
32- public:
33- TKqpQueryCache (size_t size, TDuration ttl)
34- : List(size)
35- , Ttl(ttl) {}
36-
37- void InsertQuery (const TKqpCompileResult::TConstPtr& compileResult) {
38- Y_ENSURE (compileResult->Query );
39- auto & query = *compileResult->Query ;
40-
41- YQL_ENSURE (compileResult->PreparedQuery );
42-
43- auto queryIt = QueryIndex.emplace (query, compileResult->Uid );
44- if (!queryIt.second ) {
45- EraseByUid (compileResult->Uid );
46- QueryIndex.erase (query);
47- }
48- Y_ENSURE (queryIt.second );
49- }
50-
51- void InsertAst (const TKqpCompileResult::TConstPtr& compileResult) {
52- Y_ENSURE (compileResult->Query );
53- Y_ENSURE (compileResult->GetAst ());
54-
55- AstIndex.emplace (GetQueryIdWithAst (*compileResult->Query , *compileResult->GetAst ()), compileResult->Uid );
56- }
57-
58- bool Insert (const TKqpCompileResult::TConstPtr& compileResult, bool isEnableAstCache, bool isPerStatementExecution) {
59- if (!isPerStatementExecution) {
60- InsertQuery (compileResult);
61- }
62- if (isEnableAstCache && compileResult->GetAst ()) {
63- InsertAst (compileResult);
64- }
65-
66- auto it = Index.emplace (compileResult->Uid , TCacheEntry{compileResult, TAppData::TimeProvider->Now () + Ttl});
67- Y_ABORT_UNLESS (it.second );
68-
69- TItem* item = &const_cast <TItem&>(*it.first );
70- auto removedItem = List.Insert (item);
71-
72- IncBytes (item->Value .CompileResult ->PreparedQuery ->ByteSize ());
73-
74- if (removedItem) {
75- DecBytes (removedItem->Value .CompileResult ->PreparedQuery ->ByteSize ());
76-
77- auto queryId = *removedItem->Value .CompileResult ->Query ;
78- QueryIndex.erase (queryId);
79- if (removedItem->Value .CompileResult ->GetAst ()) {
80- AstIndex.erase (GetQueryIdWithAst (queryId, *removedItem->Value .CompileResult ->GetAst ()));
81- }
82- auto indexIt = Index.find (*removedItem);
83- if (indexIt != Index.end ()) {
84- Index.erase (indexIt);
85- }
86- }
87-
88- Y_ABORT_UNLESS (List.GetSize () == Index.size ());
89-
90- return removedItem != nullptr ;
91- }
92-
93- void AttachReplayMessage (const TString uid, TString replayMessage) {
94- auto it = Index.find (TItem (uid));
95- if (it != Index.end ()) {
96- TItem* item = &const_cast <TItem&>(*it);
97- DecBytes (item->Value .ReplayMessage .size ());
98- item->Value .ReplayMessage = replayMessage;
99- item->Value .LastReplayTime = TInstant::Now ();
100- IncBytes (replayMessage.size ());
101- }
102- }
103-
104- TString ReplayMessageByUid (const TString uid, TDuration timeout) {
105- auto it = Index.find (TItem (uid));
106- if (it != Index.end ()) {
107- TInstant& lastReplayTime = const_cast <TItem&>(*it).Value .LastReplayTime ;
108- TInstant now = TInstant::Now ();
109- if (lastReplayTime + timeout < now) {
110- lastReplayTime = now;
111- return it->Value .ReplayMessage ;
112- }
113- }
114- return " " ;
115- }
116-
117- TKqpCompileResult::TConstPtr FindByUid (const TString& uid, bool promote) {
118- auto it = Index.find (TItem (uid));
119- if (it != Index.end ()) {
120- TItem* item = &const_cast <TItem&>(*it);
121- if (promote) {
122- item->Value .ExpiredAt = TAppData::TimeProvider->Now () + Ttl;
123- List.Promote (item);
124- }
125-
126- return item->Value .CompileResult ;
127- }
128-
129- return nullptr ;
130- }
131-
132- void Replace (const TKqpCompileResult::TConstPtr& compileResult) {
133- auto it = Index.find (TItem (compileResult->Uid ));
134- if (it != Index.end ()) {
135- TItem& item = const_cast <TItem&>(*it);
136- item.Value .CompileResult = compileResult;
137- }
138- }
139-
140- TKqpQueryId GetQueryIdWithAst (const TKqpQueryId& query, const NYql::TAstParseResult& ast) {
141- Y_ABORT_UNLESS (ast.Root );
142- std::shared_ptr<std::map<TString, Ydb::Type>> astPgParams;
143- if (query.QueryParameterTypes || ast.PgAutoParamValues ) {
144- astPgParams = std::make_shared<std::map<TString, Ydb::Type>>();
145- if (query.QueryParameterTypes ) {
146- for (const auto & [name, param] : *query.QueryParameterTypes ) {
147- astPgParams->insert ({name, param});
148- }
149- }
150- if (ast.PgAutoParamValues ) {
151- const auto & params = dynamic_cast <TKqpAutoParamBuilder*>(ast.PgAutoParamValues .Get ())->Values ;
152- for (const auto & [name, param] : params) {
153- astPgParams->insert ({name, param.Gettype ()});
154- }
155- }
156- }
157- return TKqpQueryId{query.Cluster , query.Database , query.DatabaseId , ast.Root ->ToString (), query.Settings , astPgParams, query.GUCSettings };
158- }
159-
160- TKqpCompileResult::TConstPtr FindByQuery (const TKqpQueryId& query, bool promote) {
161- auto uid = QueryIndex.FindPtr (query);
162- if (!uid) {
163- return nullptr ;
164- }
165-
166- return FindByUid (*uid, promote);
167- }
168-
169- TKqpCompileResult::TConstPtr FindByAst (const TKqpQueryId& query, const NYql::TAstParseResult& ast, bool promote) {
170- auto uid = AstIndex.FindPtr (GetQueryIdWithAst (query, ast));
171- if (!uid) {
172- return nullptr ;
173- }
174-
175- return FindByUid (*uid, promote);
176- }
177-
178- bool EraseByUid (const TString& uid) {
179- auto it = Index.find (TItem (uid));
180- if (it == Index.end ()) {
181- return false ;
182- }
183-
184- TItem* item = &const_cast <TItem&>(*it);
185- List.Erase (item);
186-
187- DecBytes (item->Value .CompileResult ->PreparedQuery ->ByteSize ());
188- DecBytes (item->Value .ReplayMessage .size ());
189-
190- Y_ABORT_UNLESS (item->Value .CompileResult );
191- Y_ABORT_UNLESS (item->Value .CompileResult ->Query );
192- auto queryId = *item->Value .CompileResult ->Query ;
193- QueryIndex.erase (queryId);
194- if (item->Value .CompileResult ->GetAst ()) {
195- AstIndex.erase (GetQueryIdWithAst (queryId, *item->Value .CompileResult ->GetAst ()));
196- }
197-
198- Index.erase (it);
199-
200- Y_ABORT_UNLESS (List.GetSize () == Index.size ());
201- return true ;
202- }
203-
204- size_t Size () const {
205- return Index.size ();
206- }
207-
208- ui64 Bytes () const {
209- return ByteSize;
210- }
211-
212- size_t EraseExpiredQueries () {
213- auto prevSize = Size ();
214-
215- auto now = TAppData::TimeProvider->Now ();
216- while (List.GetSize () && List.GetOldest ()->Value .ExpiredAt <= now) {
217- EraseByUid (List.GetOldest ()->Key );
218- }
219-
220- Y_ABORT_UNLESS (List.GetSize () == Index.size ());
221- return prevSize - Size ();
222- }
223-
224- void Clear () {
225- List = TList (List.GetMaxSize ());
226- Index.clear ();
227- QueryIndex.clear ();
228- AstIndex.clear ();
229- ByteSize = 0 ;
230- }
231-
232- private:
233- void DecBytes (ui64 bytes) {
234- if (bytes > ByteSize) {
235- ByteSize = 0 ;
236- } else {
237- ByteSize -= bytes;
238- }
239- }
240-
241- void IncBytes (ui64 bytes) {
242- ByteSize += bytes;
243- }
244-
245- private:
246- struct TCacheEntry {
247- TKqpCompileResult::TConstPtr CompileResult;
248- TInstant ExpiredAt;
249- TString ReplayMessage = " " ;
250- TInstant LastReplayTime = TInstant::Zero();
251- };
252-
253- using TList = TLRUList<TString, TCacheEntry>;
254- using TItem = TList::TItem;
255-
256- private:
257- TList List;
258- THashSet<TItem, TItem::THash> Index;
259- THashMap<TKqpQueryId, TString, THash<TKqpQueryId>> QueryIndex;
260- THashMap<TKqpQueryId, TString, THash<TKqpQueryId>> AstIndex;
261- ui64 ByteSize = 0 ;
262- TDuration Ttl;
263- };
264-
26531struct TKqpCompileSettings {
26632 TKqpCompileSettings (bool keepInCache, bool isQueryActionPrepare, bool perStatementResult,
26733 const TInstant& deadline, ECompileActorAction action = ECompileActorAction::COMPILE)
@@ -449,18 +215,20 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
449215 return NKikimrServices::TActivity::KQP_COMPILE_SERVICE;
450216 }
451217
452- TKqpCompileService (const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig,
218+ TKqpCompileService (
219+ TKqpQueryCache& queryCache,
220+ const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig,
453221 const TKqpSettings::TConstPtr& kqpSettings,
454222 TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
455223 std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
456224 std::optional<TKqpFederatedQuerySetup> federatedQuerySetup
457225 )
458- : TableServiceConfig(tableServiceConfig)
226+ : QueryCache(queryCache)
227+ , TableServiceConfig(tableServiceConfig)
459228 , QueryServiceConfig(queryServiceConfig)
460229 , KqpSettings(kqpSettings)
461230 , ModuleResolverState(moduleResolverState)
462231 , Counters(counters)
463- , QueryCache(TableServiceConfig.GetCompileQueryCacheSize(), TDuration::Seconds(TableServiceConfig.GetCompileQueryCacheTTLSec()))
464232 , RequestsQueue(TableServiceConfig.GetCompileRequestQueueSize())
465233 , QueryReplayFactory(std::move(queryReplayFactory))
466234 , FederatedQuerySetup(federatedQuerySetup)
@@ -1219,28 +987,33 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
1219987 }
1220988
1221989private:
990+ TKqpQueryCache& QueryCache;
991+
1222992 TTableServiceConfig TableServiceConfig;
1223993 TQueryServiceConfig QueryServiceConfig;
1224994 TKqpSettings::TConstPtr KqpSettings;
1225995 TIntrusivePtr<TModuleResolverState> ModuleResolverState;
1226996 TIntrusivePtr<TKqpCounters> Counters;
1227997 THolder<IQueryReplayBackend> QueryReplayBackend;
1228998
1229- TKqpQueryCache QueryCache;
1230999 TKqpRequestsQueue RequestsQueue;
12311000 std::shared_ptr<IQueryReplayBackendFactory> QueryReplayFactory;
12321001 std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
12331002
12341003 bool CollectDiagnostics = false ;
12351004};
12361005
1237- IActor* CreateKqpCompileService (const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig,
1006+ IActor* CreateKqpCompileService (
1007+ TKqpQueryCache& queryCache,
1008+ const TTableServiceConfig& tableServiceConfig, const TQueryServiceConfig& queryServiceConfig,
12381009 const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
12391010 std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
12401011 std::optional<TKqpFederatedQuerySetup> federatedQuerySetup
12411012 )
12421013{
1243- return new TKqpCompileService (tableServiceConfig, queryServiceConfig, kqpSettings, moduleResolverState, counters,
1014+ return new TKqpCompileService (
1015+ queryCache,
1016+ tableServiceConfig, queryServiceConfig, kqpSettings, moduleResolverState, counters,
12441017 std::move (queryReplayFactory), federatedQuerySetup);
12451018}
12461019
0 commit comments