Skip to content

Commit

Permalink
prepare for add infra common bg_tasks (#4302)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored May 14, 2024
1 parent 601fa99 commit 4549fb4
Show file tree
Hide file tree
Showing 118 changed files with 2,812 additions and 512 deletions.
2 changes: 2 additions & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ struct TKikimrEvents : TEvents {
ES_NEBIUS_ACCESS_SERVICE,
ES_REPLICATION_SERVICE,
ES_BACKUP_SERVICE,
ES_TX_BACKGROUND,
ES_SS_BG_TASKS
};
};

Expand Down
21 changes: 21 additions & 0 deletions ydb/core/grpc_services/rpc_list_operations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <ydb/core/tx/schemeshard/schemeshard_build_index.h>
#include <ydb/core/tx/schemeshard/schemeshard_export.h>
#include <ydb/core/tx/schemeshard/schemeshard_import.h>
#include <ydb/core/tx/schemeshard/olap/bg_tasks/events/global.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/public/lib/operation_id/operation_id.h>

Expand Down Expand Up @@ -50,6 +51,8 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T
const auto& request = *GetProtoRequest();

switch (ParseKind(GetProtoRequest()->kind())) {
case TOperationId::SS_BG_TASKS:
return new NSchemeShard::NBackground::TEvListRequest(DatabaseName, request.page_size(), request.page_token());
case TOperationId::EXPORT:
return new TEvExport::TEvListExportsRequest(DatabaseName, request.page_size(), request.page_token(), request.kind());
case TOperationId::IMPORT:
Expand Down Expand Up @@ -117,6 +120,24 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T
Reply(response);
}

void Handle(NSchemeShard::NBackground::TEvListResponse::TPtr& ev) {
const auto& record = ev->Get()->Record;

LOG_D("Handle TEvSchemeShard::TEvBGTasksListResponse: record# " << record.ShortDebugString());

TResponse response;

response.set_status(record.GetStatus());
if (record.GetIssues().size()) {
response.mutable_issues()->CopyFrom(record.GetIssues());
}
for (const auto& entry : record.GetEntries()) {
*response.add_operations() = entry;
}
response.set_next_page_token(record.GetNextPageToken());
Reply(response);
}

void SendListScriptExecutions() {
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvListScriptExecutionOperations(DatabaseName, GetProtoRequest()->page_size(), GetProtoRequest()->page_token()));
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/grpc_services/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ PEERDIR(
ydb/core/tx/datashard
ydb/core/tx/sharding
ydb/core/tx/data_events
ydb/core/tx/schemeshard/olap/bg_tasks/events
ydb/core/util
ydb/core/ydb_convert
ydb/core/security
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/protos/counters_columnshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,8 @@ enum ETxTypes {
TXTYPE_DATA_SHARING_APPLY_LINKS_MODIFICATION = 27 [(TxTypeOpts) = {Name: "TxDataSharingApplyLinksModification"}];
TXTYPE_DATA_SHARING_WRITE_SOURCE_CURSOR = 28 [(TxTypeOpts) = {Name: "TxDataSharingWriteSourceCursor"}];
TXTYPE_EXPORT_SAVE_CURSOR = 29 [(TxTypeOpts) = {Name: "TxExportSaveCursor"}];
TXTYPE_REMOVE_BACKGROUND_SESSION = 30 [(TxTypeOpts) = {Name: "TxRemoveBackgroundSession"}];
TXTYPE_ADD_BACKGROUND_SESSION = 31 [(TxTypeOpts) = {Name: "TxAddBackgroundSession"}];
TXTYPE_SAVE_BACKGROUND_SESSION_PROGRESS = 32 [(TxTypeOpts) = {Name: "TxSaveBackgroundSessionProgress"}];
TXTYPE_SAVE_BACKGROUND_SESSION_STATE = 33 [(TxTypeOpts) = {Name: "TxSaveBackgroundSessionState"}];
}
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/adapter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "adapter.h"

namespace NKikimr::NOlap::NBackground {

}
63 changes: 63 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/adapter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#pragma once
#include "session.h"
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
#include <ydb/library/actors/core/actorid.h>
#include <ydb/library/accessor/accessor.h>
#include <ydb/library/conclusion/status.h>

namespace NKikimr::NTabletFlatExecutor {
class TTabletExecutedFlat;
}

namespace NKikimr::NOlap::NBackground {

class ITabletAdapter {
private:
YDB_READONLY_DEF(NActors::TActorId, TabletActorId);
YDB_READONLY(TTabletId, TabletId, TTabletId(0));
NTabletFlatExecutor::TTabletExecutedFlat& TabletExecutor;
virtual bool DoLoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque<TSessionRecord>& records) = 0;
virtual void DoSaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
virtual void DoSaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
virtual void DoSaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& container) = 0;
virtual void DoRemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) = 0;
public:
ITabletAdapter(const NActors::TActorId& tabletActorId, const TTabletId tabletId, NTabletFlatExecutor::TTabletExecutedFlat& tabletExecutor)
: TabletActorId(tabletActorId)
, TabletId(tabletId)
, TabletExecutor(tabletExecutor)
{
AFL_VERIFY(!!TabletActorId);
AFL_VERIFY(!!(ui64)TabletId);
}
virtual ~ITabletAdapter() = default;

template <class T>
T& GetTabletExecutorVerifiedAs() {
T* result = dynamic_cast<T*>(&TabletExecutor);
AFL_VERIFY(result);
return *result;
}

void RemoveSessionFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TString& className, const TString& identifier) {
return DoRemoveSessionFromLocalDatabase(txc, className, identifier);
}

[[nodiscard]] bool LoadSessionsFromLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, std::deque<TSessionRecord>& records) {
return DoLoadSessionsFromLocalDatabase(txc, records);
}

void SaveSessionToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
return DoSaveSessionToLocalDatabase(txc, session);
}

void SaveStateToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
return DoSaveStateToLocalDatabase(txc, session);
}

void SaveProgressToLocalDatabase(NTabletFlatExecutor::TTransactionContext& txc, const TSessionRecord& session) {
return DoSaveProgressToLocalDatabase(txc, session);
}
};

}
35 changes: 35 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/control.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#include "control.h"
#include <ydb/core/tx/columnshard/bg_tasks/protos/data.pb.h>

namespace NKikimr::NOlap::NBackground {

NKikimrTxBackgroundProto::TSessionControlContainer TSessionControlContainer::SerializeToProto() const {
NKikimrTxBackgroundProto::TSessionControlContainer result;
*result.MutableStatusChannelContainer() = ChannelContainer.SerializeToString();
*result.MutableLogicControlContainer() = LogicControlContainer.SerializeToProto();
return result;
}

NKikimr::TConclusionStatus TSessionControlContainer::DeserializeFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& proto) {
if (!ChannelContainer.DeserializeFromString(proto.GetStatusChannelContainer())) {
return TConclusionStatus::Fail("cannot parse channel from proto");
}
if (!LogicControlContainer.DeserializeFromProto(proto.GetLogicControlContainer())) {
return TConclusionStatus::Fail("cannot parse logic from proto");
}
return TConclusionStatus::Success();
}

NKikimr::TConclusionStatus ISessionLogicControl::DeserializeFromProto(const TProto& data) {
SessionClassName = data.GetSessionClassName();
SessionIdentifier = data.GetSessionIdentifier();
return DeserializeFromString(data.GetSessionControlDescription());
}

void ISessionLogicControl::SerializeToProto(TProto& proto) const {
proto.SetSessionClassName(SessionClassName);
proto.SetSessionIdentifier(SessionIdentifier);
proto.SetSessionControlDescription(SerializeToString());
}

}
80 changes: 80 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/control.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#pragma once
#include "session.h"
#include "status_channel.h"
#include <ydb/services/bg_tasks/abstract/interface.h>
#include <ydb/library/accessor/accessor.h>

namespace NKikimrTxBackgroundProto {
class TSessionControlContainer;
class TSessionLogicControlContainer;
}

namespace NKikimr::NOlap::NBackground {

class ISessionLogicControl {
private:
YDB_READONLY_DEF(TString, SessionClassName);
YDB_READONLY_DEF(TString, SessionIdentifier);
virtual TConclusionStatus DoApply(const std::shared_ptr<ISessionLogic>& session) const = 0;

virtual TConclusionStatus DoDeserializeFromString(const TString& data) = 0;
virtual TString DoSerializeToString() const = 0;
protected:
TConclusionStatus DeserializeFromString(const TString& data) {
return DoDeserializeFromString(data);
}
TString SerializeToString() const {
return DoSerializeToString();
}
public:
using TProto = NKikimrTxBackgroundProto::TSessionLogicControlContainer;
using TFactory = NObjectFactory::TObjectFactory<ISessionLogicControl, TString>;

virtual ~ISessionLogicControl() = default;
ISessionLogicControl() = default;
ISessionLogicControl(const TString& sessionClassName, const TString& sessionIdentifier)
: SessionClassName(sessionClassName)
, SessionIdentifier(sessionIdentifier)
{

}

TConclusionStatus DeserializeFromProto(const TProto& data);
void SerializeToProto(TProto& proto) const;

TConclusionStatus Apply(const std::shared_ptr<ISessionLogic>& session) const {
session->CheckStatusCorrect();
auto result = DoApply(session);
session->CheckStatusCorrect();
return result;
}

virtual TString GetClassName() const = 0;
};

class TSessionLogicControlContainer: public NBackgroundTasks::TInterfaceProtoContainer<ISessionLogicControl> {
private:
using TBase = NBackgroundTasks::TInterfaceProtoContainer<ISessionLogicControl>;
public:
using TBase::TBase;
};

class TSessionControlContainer {
private:
YDB_READONLY_DEF(TStatusChannelContainer, ChannelContainer);
YDB_READONLY_DEF(TSessionLogicControlContainer, LogicControlContainer);
public:
NKikimrTxBackgroundProto::TSessionControlContainer SerializeToProto() const;
TConclusionStatus DeserializeFromProto(const NKikimrTxBackgroundProto::TSessionControlContainer& proto);

TSessionControlContainer() = default;

TSessionControlContainer(const TStatusChannelContainer& channel, const TSessionLogicControlContainer& logic)
: ChannelContainer(channel)
, LogicControlContainer(logic) {
AFL_VERIFY(!!ChannelContainer);
AFL_VERIFY(!!LogicControlContainer);
}
};

}
14 changes: 14 additions & 0 deletions ydb/core/tx/columnshard/bg_tasks/abstract/session.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include "session.h"
#include "adapter.h"
#include <ydb/public/api/protos/ydb_operation.pb.h>

namespace NKikimr::NOlap::NBackground {

Ydb::Operations::Operation TSessionInfoReport::SerializeToProto() const {
Ydb::Operations::Operation result;
result.set_id(ClassName + "::" + Identifier);
result.set_ready(IsFinished);
return result;
}

}
Loading

0 comments on commit 4549fb4

Please sign in to comment.