Skip to content

Commit 1b070e4

Browse files
authored
Merge 35683d1 into 4644e09
2 parents 4644e09 + 35683d1 commit 1b070e4

File tree

7 files changed

+416
-273
lines changed

7 files changed

+416
-273
lines changed

ydb/core/kqp/compile_service/kqp_compile_service.cpp

Lines changed: 33 additions & 260 deletions
Large diffs are not rendered by default.

ydb/core/kqp/compile_service/kqp_compile_service.h

Lines changed: 278 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
#include <ydb/core/kqp/common/simple/temp_tables.h>
55
#include <ydb/core/kqp/gateway/kqp_gateway.h>
66
#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
7+
#include <ydb/core/kqp/host/kqp_translate.h>
8+
9+
#include <util/system/spinlock.h>
710

811
namespace NKikimr {
912
namespace NKqp {
@@ -14,7 +17,281 @@ enum class ECompileActorAction {
1417
SPLIT,
1518
};
1619

17-
IActor* CreateKqpCompileService(const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
20+
// Cache is shared between query sessions, compile service and kqp proxy.
21+
// Currently we don't use RW lock, because of cache promotions during lookup:
22+
// in benchmark both versions (RW spinlock and adaptive W spinlock) show same
23+
// performance. Might consider RW lock again in future.
24+
class TKqpQueryCache: public TThrRefBase {
25+
public:
26+
TKqpQueryCache(size_t size, TDuration ttl)
27+
: List(size)
28+
, Ttl(ttl) {}
29+
30+
bool Insert(const TKqpCompileResult::TConstPtr& compileResult, bool isEnableAstCache, bool isPerStatementExecution) {
31+
TAdaptiveLock guard(Lock);
32+
33+
if (!isPerStatementExecution) {
34+
InsertQuery(compileResult);
35+
}
36+
if (isEnableAstCache && compileResult->GetAst()) {
37+
InsertAst(compileResult);
38+
}
39+
40+
auto it = Index.emplace(compileResult->Uid, TCacheEntry{compileResult, TAppData::TimeProvider->Now() + Ttl});
41+
Y_ABORT_UNLESS(it.second);
42+
43+
TItem* item = &const_cast<TItem&>(*it.first);
44+
auto removedItem = List.Insert(item);
45+
46+
IncBytes(item->Value.CompileResult->PreparedQuery->ByteSize());
47+
48+
if (removedItem) {
49+
DecBytes(removedItem->Value.CompileResult->PreparedQuery->ByteSize());
50+
51+
auto queryId = *removedItem->Value.CompileResult->Query;
52+
QueryIndex.erase(queryId);
53+
if (removedItem->Value.CompileResult->GetAst()) {
54+
AstIndex.erase(GetQueryIdWithAst(queryId, *removedItem->Value.CompileResult->GetAst()));
55+
}
56+
auto indexIt = Index.find(*removedItem);
57+
if (indexIt != Index.end()) {
58+
Index.erase(indexIt);
59+
}
60+
}
61+
62+
Y_ABORT_UNLESS(List.GetSize() == Index.size());
63+
64+
return removedItem != nullptr;
65+
}
66+
67+
void AttachReplayMessage(const TString uid, TString replayMessage) {
68+
TAdaptiveLock guard(Lock);
69+
70+
auto it = Index.find(TItem(uid));
71+
if (it != Index.end()) {
72+
TItem* item = &const_cast<TItem&>(*it);
73+
DecBytes(item->Value.ReplayMessage.size());
74+
item->Value.ReplayMessage = replayMessage;
75+
item->Value.LastReplayTime = TInstant::Now();
76+
IncBytes(replayMessage.size());
77+
}
78+
}
79+
80+
TString ReplayMessageByUid(const TString uid, TDuration timeout) {
81+
TAdaptiveLock guard(Lock);
82+
83+
auto it = Index.find(TItem(uid));
84+
if (it != Index.end()) {
85+
TInstant& lastReplayTime = const_cast<TItem&>(*it).Value.LastReplayTime;
86+
TInstant now = TInstant::Now();
87+
if (lastReplayTime + timeout < now) {
88+
lastReplayTime = now;
89+
return it->Value.ReplayMessage;
90+
}
91+
}
92+
return "";
93+
}
94+
95+
TKqpCompileResult::TConstPtr FindByUid(const TString& uid, bool promote) {
96+
TAdaptiveLock guard(Lock);
97+
98+
auto it = Index.find(TItem(uid));
99+
if (it != Index.end()) {
100+
TItem* item = &const_cast<TItem&>(*it);
101+
if (promote) {
102+
item->Value.ExpiredAt = TAppData::TimeProvider->Now() + Ttl;
103+
List.Promote(item);
104+
}
105+
106+
return item->Value.CompileResult;
107+
}
108+
109+
return nullptr;
110+
}
111+
112+
void Replace(const TKqpCompileResult::TConstPtr& compileResult) {
113+
TAdaptiveLock guard(Lock);
114+
115+
auto it = Index.find(TItem(compileResult->Uid));
116+
if (it != Index.end()) {
117+
TItem& item = const_cast<TItem&>(*it);
118+
item.Value.CompileResult = compileResult;
119+
}
120+
}
121+
122+
TKqpCompileResult::TConstPtr FindByQuery(const TKqpQueryId& query, bool promote) {
123+
TAdaptiveLock guard(Lock);
124+
125+
auto uid = QueryIndex.FindPtr(query);
126+
if (!uid) {
127+
return nullptr;
128+
}
129+
130+
// we're holding read and assume it's recursive
131+
return FindByUid(*uid, promote);
132+
}
133+
134+
TKqpCompileResult::TConstPtr FindByAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast, bool promote) {
135+
TAdaptiveLock guard(Lock);
136+
137+
auto uid = AstIndex.FindPtr(GetQueryIdWithAst(query, ast));
138+
if (!uid) {
139+
return nullptr;
140+
}
141+
142+
return FindByUid(*uid, promote);
143+
}
144+
145+
bool EraseByUid(const TString& uid) {
146+
TAdaptiveLock guard(Lock);
147+
return EraseByUidImpl(uid);
148+
}
149+
150+
size_t Size() const {
151+
TAdaptiveLock guard(Lock);
152+
return SizeImpl();
153+
}
154+
155+
ui64 Bytes() const {
156+
TAdaptiveLock guard(Lock);
157+
return ByteSize;
158+
}
159+
160+
size_t EraseExpiredQueries() {
161+
TAdaptiveLock guard(Lock);
162+
163+
auto prevSize = SizeImpl();
164+
165+
auto now = TAppData::TimeProvider->Now();
166+
while (List.GetSize() && List.GetOldest()->Value.ExpiredAt <= now) {
167+
EraseByUidImpl(List.GetOldest()->Key);
168+
}
169+
170+
Y_ABORT_UNLESS(List.GetSize() == Index.size());
171+
return prevSize - SizeImpl();
172+
}
173+
174+
void Clear() {
175+
TAdaptiveLock guard(Lock);
176+
177+
List = TList(List.GetMaxSize());
178+
Index.clear();
179+
QueryIndex.clear();
180+
AstIndex.clear();
181+
ByteSize = 0;
182+
}
183+
184+
private:
185+
size_t SizeImpl() const {
186+
return Index.size();
187+
}
188+
189+
void InsertQuery(const TKqpCompileResult::TConstPtr& compileResult) {
190+
Y_ENSURE(compileResult->Query);
191+
auto& query = *compileResult->Query;
192+
193+
YQL_ENSURE(compileResult->PreparedQuery);
194+
195+
auto queryIt = QueryIndex.emplace(query, compileResult->Uid);
196+
if (!queryIt.second) {
197+
EraseByUid(compileResult->Uid);
198+
QueryIndex.erase(query);
199+
}
200+
Y_ENSURE(queryIt.second);
201+
}
202+
203+
void InsertAst(const TKqpCompileResult::TConstPtr& compileResult) {
204+
Y_ENSURE(compileResult->Query);
205+
Y_ENSURE(compileResult->GetAst());
206+
207+
AstIndex.emplace(GetQueryIdWithAst(*compileResult->Query, *compileResult->GetAst()), compileResult->Uid);
208+
}
209+
210+
bool EraseByUidImpl(const TString& uid) {
211+
auto it = Index.find(TItem(uid));
212+
if (it == Index.end()) {
213+
return false;
214+
}
215+
216+
TItem* item = &const_cast<TItem&>(*it);
217+
List.Erase(item);
218+
219+
DecBytes(item->Value.CompileResult->PreparedQuery->ByteSize());
220+
DecBytes(item->Value.ReplayMessage.size());
221+
222+
Y_ABORT_UNLESS(item->Value.CompileResult);
223+
Y_ABORT_UNLESS(item->Value.CompileResult->Query);
224+
auto queryId = *item->Value.CompileResult->Query;
225+
QueryIndex.erase(queryId);
226+
if (item->Value.CompileResult->GetAst()) {
227+
AstIndex.erase(GetQueryIdWithAst(queryId, *item->Value.CompileResult->GetAst()));
228+
}
229+
230+
Index.erase(it);
231+
232+
Y_ABORT_UNLESS(List.GetSize() == Index.size());
233+
return true;
234+
}
235+
236+
TKqpQueryId GetQueryIdWithAst(const TKqpQueryId& query, const NYql::TAstParseResult& ast) {
237+
Y_ABORT_UNLESS(ast.Root);
238+
std::shared_ptr<std::map<TString, Ydb::Type>> astPgParams;
239+
if (query.QueryParameterTypes || ast.PgAutoParamValues) {
240+
astPgParams = std::make_shared<std::map<TString, Ydb::Type>>();
241+
if (query.QueryParameterTypes) {
242+
for (const auto& [name, param] : *query.QueryParameterTypes) {
243+
astPgParams->insert({name, param});
244+
}
245+
}
246+
if (ast.PgAutoParamValues) {
247+
const auto& params = dynamic_cast<TKqpAutoParamBuilder*>(ast.PgAutoParamValues.Get())->Values;
248+
for (const auto& [name, param] : params) {
249+
astPgParams->insert({name, param.Gettype()});
250+
}
251+
}
252+
}
253+
return TKqpQueryId{query.Cluster, query.Database, query.DatabaseId, ast.Root->ToString(), query.Settings, astPgParams, query.GUCSettings};
254+
}
255+
256+
void DecBytes(ui64 bytes) {
257+
if (bytes > ByteSize) {
258+
ByteSize = 0;
259+
} else {
260+
ByteSize -= bytes;
261+
}
262+
}
263+
264+
void IncBytes(ui64 bytes) {
265+
ByteSize += bytes;
266+
}
267+
268+
private:
269+
struct TCacheEntry {
270+
TKqpCompileResult::TConstPtr CompileResult;
271+
TInstant ExpiredAt;
272+
TString ReplayMessage = "";
273+
TInstant LastReplayTime = TInstant::Zero();
274+
};
275+
276+
using TList = TLRUList<TString, TCacheEntry>;
277+
using TItem = TList::TItem;
278+
279+
private:
280+
TList List;
281+
THashSet<TItem, TItem::THash> Index;
282+
THashMap<TKqpQueryId, TString, THash<TKqpQueryId>> QueryIndex;
283+
THashMap<TKqpQueryId, TString, THash<TKqpQueryId>> AstIndex;
284+
ui64 ByteSize = 0;
285+
TDuration Ttl;
286+
287+
TAdaptiveLock Lock;
288+
};
289+
290+
using TKqpQueryCachePtr = TIntrusivePtr<TKqpQueryCache>;
291+
292+
IActor* CreateKqpCompileService(
293+
TKqpQueryCachePtr queryCache,
294+
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
18295
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
19296
const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState,
20297
TIntrusivePtr<TKqpCounters> counters, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,

ydb/core/kqp/proxy_service/kqp_proxy_service.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
209209
, ModuleResolverState()
210210
, KqpProxySharedResources(std::move(kqpProxySharedResources))
211211
, S3ActorsFactory(std::move(s3ActorsFactory))
212+
, QueryCache(new TKqpQueryCache(TableServiceConfig.GetCompileQueryCacheSize(), TDuration::Seconds(TableServiceConfig.GetCompileQueryCacheTTLSec())))
212213
{}
213214

214215
void Bootstrap(const TActorContext &ctx) {
@@ -276,7 +277,9 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
276277
}
277278

278279
// Create compile service
279-
CompileService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpCompileService(TableServiceConfig, QueryServiceConfig,
280+
CompileService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpCompileService(
281+
QueryCache,
282+
TableServiceConfig, QueryServiceConfig,
280283
KqpSettings, ModuleResolverState, Counters, std::move(QueryReplayFactory), FederatedQuerySetup));
281284
TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService(
282285
MakeKqpCompileServiceID(SelfId().NodeId()), CompileService);
@@ -1496,7 +1499,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
14961499

14971500
auto config = CreateConfig(KqpSettings, workerSettings);
14981501

1499-
IActor* sessionActor = CreateKqpSessionActor(SelfId(), ResourceManager_, CaFactory_, sessionId, KqpSettings, workerSettings,
1502+
IActor* sessionActor = CreateKqpSessionActor(SelfId(), QueryCache, ResourceManager_, CaFactory_, sessionId, KqpSettings, workerSettings,
15001503
FederatedQuerySetup, AsyncIoFactory, ModuleResolverState, Counters,
15011504
QueryServiceConfig, KqpTempTablesAgentActor);
15021505
auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId);
@@ -1902,6 +1905,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
19021905
std::shared_ptr<TKqpProxySharedResources> KqpProxySharedResources;
19031906
std::shared_ptr<NYql::NDq::IS3ActorsFactory> S3ActorsFactory;
19041907

1908+
TKqpQueryCachePtr QueryCache;
1909+
19051910
bool ServerWorkerBalancerComplete = false;
19061911
std::optional<TString> SelfDataCenterId;
19071912
TIntrusivePtr<IRandomProvider> RandomProvider;

0 commit comments

Comments
 (0)