Skip to content

Commit

Permalink
Merge 66734c8 into 1002b94
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Sep 23, 2024
2 parents 1002b94 + 66734c8 commit e8fd8f1
Show file tree
Hide file tree
Showing 17 changed files with 446 additions and 50 deletions.
42 changes: 42 additions & 0 deletions ydb/core/kqp/common/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ struct TEvKqp {
struct TEvScriptRequest : public TEventLocal<TEvScriptRequest, TKqpEvents::EvScriptRequest> {
TEvScriptRequest() = default;

const TString& GetDatabase() const {
return Record.GetRequest().GetDatabase();
}

const TString& GetDatabaseId() const {
return Record.GetRequest().GetDatabaseId();
}

void SetDatabaseId(const TString& databaseId) {
Record.MutableRequest()->SetDatabaseId(databaseId);
}

mutable NKikimrKqp::TEvQueryRequest Record;
TDuration ForgetAfter;
TDuration ResultsTtl;
Expand Down Expand Up @@ -161,6 +173,36 @@ struct TEvKqp {
return issues;
}
};

struct TEvSubscribeOnDatabase : public TEventLocal<TEvSubscribeOnDatabase, TKqpEvents::EvSubscribeOnDatabase> {
explicit TEvSubscribeOnDatabase(const TString& database)
: Database(database)
{}

TString Database;
};

struct TEvUpdateDatabaseInfo : public TEventLocal<TEvUpdateDatabaseInfo, TKqpEvents::EvUpdateDatabaseInfo> {
TEvUpdateDatabaseInfo(const TString& database, Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
: Status(status)
, Database(database)
, Issues(std::move(issues))
{}

TEvUpdateDatabaseInfo(const TString& database, const TString& databaseId, bool serverless)
: Status(Ydb::StatusIds::SUCCESS)
, Database(database)
, DatabaseId(databaseId)
, Serverless(serverless)
, Issues({})
{}

Ydb::StatusIds::StatusCode Status;
TString Database;
TString DatabaseId;
bool Serverless = false;
NYql::TIssues Issues;
};
};

} // namespace NKikimr::NKqp
12 changes: 12 additions & 0 deletions ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,17 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
return PoolConfig;
}

const TString& GetDatabaseId() const {
if (DatabaseId) {
return DatabaseId;
}
return Record.GetRequest().GetDatabaseId();
}

void SetDatabaseId(const TString& databaseId) {
DatabaseId = databaseId;
}

mutable NKikimrKqp::TEvQueryRequest Record;

private:
Expand All @@ -363,6 +374,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
mutable TIntrusiveConstPtr<NACLib::TUserToken> Token_;
TActorId RequestActorId;
TString Database;
TString DatabaseId;
TString SessionId;
TString YqlText;
TString QueryId;
Expand Down
52 changes: 34 additions & 18 deletions ydb/core/kqp/common/events/script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,34 @@ enum EFinalizationStatus : i32 {
FS_ROLLBACK,
};

struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
template <typename TEv, ui32 TEventType>
struct TEventWithDatabaseId : public NActors::TEventLocal<TEv, TEventType> {
TEventWithDatabaseId(const TString& database)
: Database(database)
, OperationId(id)
{}

const TString& GetDatabase() const {
return Database;
}

const TString& GetDatabaseId() const {
return DatabaseId;
}

void SetDatabaseId(const TString& databaseId) {
DatabaseId = databaseId;
}

const TString Database;
TString DatabaseId;
};

struct TEvForgetScriptExecutionOperation : public TEventWithDatabaseId<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: TEventWithDatabaseId(database)
, OperationId(id)
{}

const NOperationId::TOperationId OperationId;
};

Expand All @@ -43,14 +64,12 @@ struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal<T
NYql::TIssues Issues;
};

struct TEvGetScriptExecutionOperation : public NActors::TEventLocal<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> {
explicit TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: Database(database)
struct TEvGetScriptExecutionOperation : public TEventWithDatabaseId<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> {
TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: TEventWithDatabaseId(database)
, OperationId(id)
{
}
{}

TString Database;
NOperationId::TOperationId OperationId;
};

Expand Down Expand Up @@ -97,14 +116,13 @@ struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvG
TMaybe<google::protobuf::Any> Metadata;
};

struct TEvListScriptExecutionOperations : public NActors::TEventLocal<TEvListScriptExecutionOperations, TKqpScriptExecutionEvents::EvListScriptExecutionOperations> {
struct TEvListScriptExecutionOperations : public TEventWithDatabaseId<TEvListScriptExecutionOperations, TKqpScriptExecutionEvents::EvListScriptExecutionOperations> {
TEvListScriptExecutionOperations(const TString& database, const ui64 pageSize, const TString& pageToken)
: Database(database)
: TEventWithDatabaseId(database)
, PageSize(pageSize)
, PageToken(pageToken)
{}

TString Database;
ui64 PageSize;
TString PageToken;
};
Expand Down Expand Up @@ -151,14 +169,12 @@ struct TEvCheckAliveRequest : public NActors::TEventPB<TEvCheckAliveRequest, NKi
struct TEvCheckAliveResponse : public NActors::TEventPB<TEvCheckAliveResponse, NKikimrKqp::TEvCheckAliveResponse, TKqpScriptExecutionEvents::EvCheckAliveResponse> {
};

struct TEvCancelScriptExecutionOperation : public NActors::TEventLocal<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
explicit TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: Database(database)
struct TEvCancelScriptExecutionOperation : public TEventWithDatabaseId<TEvCancelScriptExecutionOperation, TKqpScriptExecutionEvents::EvCancelScriptExecutionOperation> {
TEvCancelScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: TEventWithDatabaseId(database)
, OperationId(id)
{
}
{}

TString Database;
NOperationId::TOperationId OperationId;
};

Expand Down
10 changes: 0 additions & 10 deletions ydb/core/kqp/common/events/workload_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,4 @@ struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWo
const std::optional<NACLib::TSecurityObject> SecurityObject;
};

struct TEvUpdateDatabaseInfo : public NActors::TEventLocal<TEvUpdateDatabaseInfo, TKqpWorkloadServiceEvents::EvUpdateDatabaseInfo> {
TEvUpdateDatabaseInfo(const TString& database, bool serverless)
: Database(database)
, Serverless(serverless)
{}

const TString Database;
const bool Serverless;
};

} // NKikimr::NKqp::NWorkload
4 changes: 4 additions & 0 deletions ydb/core/kqp/common/kqp_event_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetPoolId(PoolId);
}

if (!DatabaseId.empty()) {
Record.MutableRequest()->SetDatabaseId(DatabaseId);
}

Record.MutableRequest()->SetSessionId(SessionId);
Record.MutableRequest()->SetAction(QueryAction);
Record.MutableRequest()->SetType(QueryType);
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/common/simple/kqp_event_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ struct TKqpEvents {
EvListSessionsRequest,
EvListSessionsResponse,
EvListProxyNodesRequest,
EvListProxyNodesResponse
EvListProxyNodesResponse,
EvSubscribeOnDatabase,
EvUpdateDatabaseInfo
};

static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution);
Expand Down Expand Up @@ -174,7 +176,6 @@ struct TKqpWorkloadServiceEvents {
EvCleanupRequest,
EvCleanupResponse,
EvUpdatePoolInfo,
EvUpdateDatabaseInfo,
EvSubscribeOnPoolChanges,
};
};
Expand Down
150 changes: 150 additions & 0 deletions ydb/core/kqp/proxy_service/kqp_proxy_databases_cache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#include "kqp_proxy_service_impl.h"

#include <ydb/core/kqp/workload_service/actors/actors.h>
#include <ydb/core/kqp/workload_service/common/events.h>

#include <ydb/core/tx/scheme_cache/scheme_cache.h>


namespace NKikimr::NKqp {

namespace {

class TDatabaseSubscriberActor : public TActor<TDatabaseSubscriberActor> {
using TBase = TActor<TDatabaseSubscriberActor>;

struct TDatabaseState {
bool FetchRequestIsRunning = false;
TPathId WatchPathId;

TString DatabaseId;
bool Serverless = false;
std::unordered_set<TActorId> Subscribers;
};

public:
TDatabaseSubscriberActor()
: TBase(&TDatabaseSubscriberActor::StateFunc)
{}

void Handle(TEvKqp::TEvSubscribeOnDatabase::TPtr& ev) {
const TString& database = CanonizePath(ev->Get()->Database);
auto& databaseState = Subscriptions[database];

if (databaseState.DatabaseId) {
SendSubscriberInfo(database, ev->Sender, databaseState, Ydb::StatusIds::SUCCESS);
} else if (!databaseState.FetchRequestIsRunning) {
Register(NWorkload::CreateDatabaseFetcherActor(SelfId(), database));
databaseState.FetchRequestIsRunning = true;
}

databaseState.Subscribers.insert(ev->Sender);
}

void Handle(NWorkload::TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) {
const TString& database = CanonizePath(ev->Get()->Database);
auto& databaseState = Subscriptions[database];

UpdateDatabaseState(databaseState, database, ev->Get()->PathId, ev->Get()->Serverless);
UpdateSubscribersInfo(database, databaseState, ev->Get()->Status, ev->Get()->Issues);

databaseState.FetchRequestIsRunning = false;
databaseState.WatchPathId = ev->Get()->PathId;

if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) {
WatchKey++;
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(databaseState.WatchPathId, WatchKey));
WatchDatabases.insert({WatchKey, database});
}
}

void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev) {
auto it = WatchDatabases.find(ev->Get()->Key);
if (it == WatchDatabases.end()) {
return;
}

const auto& result = ev->Get()->Result;
if (!result || result->GetStatus() != NKikimrScheme::StatusSuccess) {
return;
}

if (result->GetPathDescription().HasDomainDescription()) {
NSchemeCache::TDomainInfo description(result->GetPathDescription().GetDomainDescription());

auto& databaseState = Subscriptions[it->second];
UpdateDatabaseState(databaseState, it->second, description.DomainKey, description.IsServerless());
UpdateSubscribersInfo(it->second, databaseState, Ydb::StatusIds::SUCCESS);
}
}

void Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev) {
auto it = WatchDatabases.find(ev->Get()->Key);
if (it == WatchDatabases.end()) {
return;
}

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(ev->Get()->Key));

auto databaseStateIt = Subscriptions.find(it->second);
if (databaseStateIt != Subscriptions.end()) {
UpdateSubscribersInfo(it->second, databaseStateIt->second, Ydb::StatusIds::NOT_FOUND, {NYql::TIssue{"Database was dropped"}});
Subscriptions.erase(databaseStateIt);
}

WatchDatabases.erase(it);
}

void HandlePoison() {
if (!WatchDatabases.empty()) {
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(0));
}

TBase::PassAway();
}

STRICT_STFUNC(StateFunc,
hFunc(TEvKqp::TEvSubscribeOnDatabase, Handle);
hFunc(NWorkload::TEvPrivate::TEvFetchDatabaseResponse, Handle);
hFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
hFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle);
sFunc(TEvents::TEvPoison, HandlePoison);
)

private:
void UpdateDatabaseState(TDatabaseState& databaseState, const TString& database, TPathId pathId, bool serverless) {
databaseState.DatabaseId = (serverless ? TStringBuilder() << pathId.OwnerId << ":" << pathId.LocalPathId << ":" : TStringBuilder()) << database;
databaseState.Serverless = serverless;
}

void UpdateSubscribersInfo(const TString& database, const TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) {
for (const auto& subscriber : databaseState.Subscribers) {
SendSubscriberInfo(database, subscriber, databaseState, status, issues);
}
}

void SendSubscriberInfo(const TString& database, TActorId subscriber, const TDatabaseState& databaseState, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) {
if (status == Ydb::StatusIds::SUCCESS || status == Ydb::StatusIds::UNSUPPORTED) {
Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, databaseState.DatabaseId, databaseState.Serverless));
} else {
NYql::TIssue rootIssue(TStringBuilder() << "Failed to describe database" << database);
for (const auto& issue : issues) {
rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue));
}
Send(subscriber, new TEvKqp::TEvUpdateDatabaseInfo(database, status, {rootIssue}));
}
}

private:
std::unordered_map<TString, TDatabaseState> Subscriptions;
std::unordered_map<ui32, TString> WatchDatabases;
ui32 WatchKey = 0;
};

} // anonymous namespace

void TDatabasesCache::CreateDatabaseSubscriberActor(TActorContext actorContext) {
SubscriberActor = actorContext.Register(new TDatabaseSubscriberActor());
}

} // namespace NKikimr::NKqp
Loading

0 comments on commit e8fd8f1

Please sign in to comment.