diff --git a/ydb/core/grpc_services/rpc_export.cpp b/ydb/core/grpc_services/rpc_export.cpp index 6bf1b5c713ad..59dac9c6fda4 100644 --- a/ydb/core/grpc_services/rpc_export.cpp +++ b/ydb/core/grpc_services/rpc_export.cpp @@ -41,6 +41,7 @@ class TExportRPC: public TRpcOperationRequestActor, if (this->UserToken) { ev->Record.SetUserSID(this->UserToken->GetUserSID()); } + ev->Record.SetPeerName(this->Request->GetPeerName()); auto& createExport = *ev->Record.MutableRequest(); *createExport.MutableOperationParams() = request.operation_params(); diff --git a/ydb/core/grpc_services/rpc_import.cpp b/ydb/core/grpc_services/rpc_import.cpp index 174e9df84cb9..1cf09ad123c0 100644 --- a/ydb/core/grpc_services/rpc_import.cpp +++ b/ydb/core/grpc_services/rpc_import.cpp @@ -39,6 +39,7 @@ class TImportRPC: public TRpcOperationRequestActor, if (this->UserToken) { ev->Record.SetUserSID(this->UserToken->GetUserSID()); } + ev->Record.SetPeerName(this->Request->GetPeerName()); auto& createImport = *ev->Record.MutableRequest(); createImport.MutableOperationParams()->CopyFrom(request.operation_params()); diff --git a/ydb/core/protos/export.proto b/ydb/core/protos/export.proto index e8d78e9350a8..531ba4618e47 100644 --- a/ydb/core/protos/export.proto +++ b/ydb/core/protos/export.proto @@ -34,8 +34,9 @@ message TCreateExportRequest { message TEvCreateExportRequest { optional uint64 TxId = 1; optional string DatabaseName = 2; - optional string UserSID = 4; optional TCreateExportRequest Request = 3; + optional string UserSID = 4; + optional string PeerName = 5; } message TCreateExportResponse { @@ -70,8 +71,10 @@ message TCancelExportRequest { message TEvCancelExportRequest { optional uint64 TxId = 1; - optional string DatabaseName = 3; optional TCancelExportRequest Request = 2; + optional string DatabaseName = 3; + optional string UserSID = 4; + optional string PeerName = 5; } message TCancelExportResponse { @@ -90,8 +93,10 @@ message TForgetExportRequest { message TEvForgetExportRequest { optional uint64 TxId = 1; - optional string DatabaseName = 3; optional TForgetExportRequest Request = 2; + optional string DatabaseName = 3; + optional string UserSID = 4; + optional string PeerName = 5; } message TForgetExportResponse { diff --git a/ydb/core/protos/import.proto b/ydb/core/protos/import.proto index be4d36954d78..f13fa156ce93 100644 --- a/ydb/core/protos/import.proto +++ b/ydb/core/protos/import.proto @@ -34,6 +34,7 @@ message TEvCreateImportRequest { optional string DatabaseName = 2; optional string UserSID = 3; optional TCreateImportRequest Request = 4; + optional string PeerName = 5; } message TCreateImportResponse { @@ -68,8 +69,10 @@ message TCancelImportRequest { message TEvCancelImportRequest { optional uint64 TxId = 1; - optional string DatabaseName = 3; optional TCancelImportRequest Request = 2; + optional string DatabaseName = 3; + optional string UserSID = 4; + optional string PeerName = 5; } message TCancelImportResponse { @@ -88,8 +91,10 @@ message TForgetImportRequest { message TEvForgetImportRequest { optional uint64 TxId = 1; - optional string DatabaseName = 3; optional TForgetImportRequest Request = 2; + optional string DatabaseName = 3; + optional string UserSID = 4; + optional string PeerName = 5; } message TForgetImportResponse { diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index d49712d3dc6d..f5f3cb1bbed3 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -4199,8 +4199,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase { TString settings = rowset.GetValue(); auto domainPathId = TPathId(rowset.GetValueOrDefault(selfId), rowset.GetValue()); + TString peerName = rowset.GetValueOrDefault(); - TExportInfo::TPtr exportInfo = new TExportInfo(id, uid, kind, settings, domainPathId); + TExportInfo::TPtr exportInfo = new TExportInfo(id, uid, kind, settings, domainPathId, peerName); if (rowset.HaveValue()) { exportInfo->UserSID = rowset.GetValue(); @@ -4297,11 +4298,12 @@ struct TSchemeShard::TTxInit : public TTransactionBase { TImportInfo::EKind kind = static_cast(rowset.GetValue()); auto domainPathId = TPathId(rowset.GetValue(), rowset.GetValue()); + TString peerName = rowset.GetValueOrDefault(); Ydb::Import::ImportFromS3Settings settings; Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(settings, rowset.GetValue())); - TImportInfo::TPtr importInfo = new TImportInfo(id, uid, kind, settings, domainPathId); + TImportInfo::TPtr importInfo = new TImportInfo(id, uid, kind, settings, domainPathId, peerName); if (rowset.HaveValue()) { importInfo->UserSID = rowset.GetValue(); diff --git a/ydb/core/tx/schemeshard/schemeshard_audit_log.cpp b/ydb/core/tx/schemeshard/schemeshard_audit_log.cpp index 2336a7c2d97f..7e0cd3f54d87 100644 --- a/ydb/core/tx/schemeshard/schemeshard_audit_log.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_audit_log.cpp @@ -1,20 +1,29 @@ -#include "schemeshard_audit_log.h" -#include "schemeshard_path.h" -#include "schemeshard_audit_log_fragment.h" +#include + +#include +#include -#include #include +#include +#include + #include -#include +#include + +#include "schemeshard_path.h" +#include "schemeshard_impl.h" +#include "schemeshard_xxport__helpers.h" +#include "schemeshard_audit_log_fragment.h" +#include "schemeshard_audit_log.h" namespace NKikimr::NSchemeShard { namespace { - const TString SchemeshardComponentName = "schemeshard"; - //NOTE: EmptyValue couldn't be an empty string as AUDIT_PART() skips parts with an empty values - const TString EmptyValue = "{none}"; -} +const TString SchemeshardComponentName = "schemeshard"; + +//NOTE: EmptyValue couldn't be an empty string as AUDIT_PART() skips parts with an empty values +const TString EmptyValue = "{none}"; TString GeneralStatus(NKikimrScheme::EStatus actualStatus) { switch(actualStatus) { @@ -68,6 +77,8 @@ TPath DatabasePathFromWorkingDir(TSchemeShard* SS, const TString &opWorkingDir) return databasePath; } +} // anonymous namespace + void AuditLogModifySchemeTransaction(const NKikimrScheme::TEvModifySchemeTransaction& request, const NKikimrScheme::TEvModifySchemeTransactionResult& response, TSchemeShard* SS, const TString& userSID) { // Each TEvModifySchemeTransaction.Transaction is a self sufficient operation and should be logged independently // (even if it was packed into a single TxProxy transaction with some other operations). @@ -167,6 +178,205 @@ void AuditLogModifySchemeTransactionDeprecated(const NKikimrScheme::TEvModifySch } } +namespace { + +struct TXxportRecord { + TString OperationName; + ui64 Id; + TString Uid; + TString RemoteAddress; + TString UserSID; + TString DatabasePath; + TString Status; + Ydb::StatusIds::StatusCode DetailedStatus; + TString Reason; + TVector> AdditionalParts; + TString StartTime; + TString EndTime; + TString CloudId; + TString FolderId; + TString ResourceId; +}; + +void AuditLogXxport(TXxportRecord&& record) { + AUDIT_LOG( + AUDIT_PART("component", SchemeshardComponentName) + + AUDIT_PART("id", std::to_string(record.Id)) + AUDIT_PART("uid", record.Uid); + AUDIT_PART("remote_address", (!record.RemoteAddress.empty() ? record.RemoteAddress : EmptyValue)) + AUDIT_PART("subject", (!record.UserSID.empty() ? record.UserSID : EmptyValue)) + AUDIT_PART("database", (!record.DatabasePath.empty() ? record.DatabasePath : EmptyValue)) + AUDIT_PART("operation", record.OperationName) + AUDIT_PART("status", record.Status) + AUDIT_PART("detailed_status", Ydb::StatusIds::StatusCode_Name(record.DetailedStatus)) + AUDIT_PART("reason", record.Reason) + + // all parts are considered required, so all empty values are replaced with a special stub + for (const auto& [name, value] : record.AdditionalParts) { + AUDIT_PART(name, (!value.empty() ? value : EmptyValue)) + } + + AUDIT_PART("start_time", record.StartTime) + AUDIT_PART("end_time", record.EndTime) + + AUDIT_PART("cloud_id", record.CloudId); + AUDIT_PART("folder_id", record.FolderId); + AUDIT_PART("resource_id", record.ResourceId); + ); +} + +using TParts = decltype(TXxportRecord::AdditionalParts); + +template +TParts ExportKindSpecificParts(const Proto& proto) { + //NOTE: intentional switch -- that will help to detect (by breaking the compilation) + // the moment when and if oneof Settings will be extended + switch (proto.GetSettingsCase()) { + case Proto::kExportToYtSettings: + return ExportKindSpecificParts(proto.GetExportToYtSettings()); + case Proto::kExportToS3Settings: + return ExportKindSpecificParts(proto.GetExportToS3Settings()); + case Proto::SETTINGS_NOT_SET: + return {}; + } +} +template <> TParts ExportKindSpecificParts(const Ydb::Export::ExportToYtSettings& proto) { + return { + {"export_type", "yt"}, + {"export_item_count", ToString(proto.items().size())}, + {"export_yt_prefix", ((proto.items().size() > 0) ? proto.items(0).destination_path() : "")}, + }; +} +template <> TParts ExportKindSpecificParts(const Ydb::Export::ExportToS3Settings& proto) { + return { + {"export_type", "s3"}, + {"export_item_count", ToString(proto.items().size())}, + {"export_s3_bucket", proto.bucket()}, + //NOTE: take first item's destination_prefix as a "good enough approximation" + // (each item has its own destination_prefix, but in practice they are all the same) + {"export_s3_prefix", ((proto.items().size() > 0) ? proto.items(0).destination_prefix() : "")}, + }; +} + +template +TParts ImportKindSpecificParts(const Proto& proto) { + //NOTE: intentional switch -- that will help to detect (by breaking the compilation) + // the moment when and if oneof Settings will be extended + switch (proto.GetSettingsCase()) { + case Proto::kImportFromS3Settings: + return ImportKindSpecificParts(proto.GetImportFromS3Settings()); + case Proto::SETTINGS_NOT_SET: + return {}; + } +} +template <> TParts ImportKindSpecificParts(const Ydb::Import::ImportFromS3Settings& proto) { + return { + {"import_type", "s3"}, + {"export_item_count", ToString(proto.items().size())}, + {"import_s3_bucket", proto.bucket()}, + //NOTE: take first item's source_prefix as a "good enough approximation" + // (each item has its own source_prefix, but in practice they are all the same) + {"import_s3_prefix", ((proto.items().size() > 0) ? proto.items(0).source_prefix() : "")}, + }; +} + +} // anonymous namespace + +template +void _AuditLogXxportStart(const Request& request, const Response& response, const TString& operationName, TParts&& additionalParts, TSchemeShard* SS) { + TPath databasePath = DatabasePathFromWorkingDir(SS, request.GetDatabaseName()); + auto [cloud_id, folder_id, database_id] = GetDatabaseCloudIds(databasePath); + auto peerName = NKikimr::NAddressClassifier::ExtractAddress(request.GetPeerName()); + const auto& entry = response.GetResponse().GetEntry(); + + AuditLogXxport({ + .OperationName = operationName, + //NOTE: original request's tx-id is used as an operation id + .Id = request.GetTxId(), + .Uid = GetUid(request.GetRequest().GetOperationParams()), + .RemoteAddress = peerName, + .UserSID = request.GetUserSID(), + .DatabasePath = databasePath.PathString(), + .Status = (entry.GetStatus() == Ydb::StatusIds::SUCCESS ? "SUCCESS" : "ERROR"), + .DetailedStatus = entry.GetStatus(), + //NOTE: use main issue (on {ex,im}port itself), ignore issues on individual items + .Reason = ((entry.IssuesSize() > 0) ? entry.GetIssues(0).message() : ""), + + .AdditionalParts = std::move(additionalParts), + + // no start or end times + + .CloudId = cloud_id, + .FolderId = folder_id, + .ResourceId = database_id, + }); +} + +void AuditLogExportStart(const NKikimrExport::TEvCreateExportRequest& request, const NKikimrExport::TEvCreateExportResponse& response, TSchemeShard* SS) { + _AuditLogXxportStart(request, response, "EXPORT START", ExportKindSpecificParts(request.GetRequest()), SS); +} + +void AuditLogImportStart(const NKikimrImport::TEvCreateImportRequest& request, const NKikimrImport::TEvCreateImportResponse& response, TSchemeShard* SS) { + _AuditLogXxportStart(request, response, "IMPORT START", ImportKindSpecificParts(request.GetRequest()), SS); +} + +template +void _AuditLogXxportEnd(const Info& info, const TString& operationName, TParts&& additionalParts, TSchemeShard* SS) { + const TPath databasePath = TPath::Init(info.DomainPathId, SS); + auto [cloud_id, folder_id, database_id] = GetDatabaseCloudIds(databasePath); + auto peerName = NKikimr::NAddressClassifier::ExtractAddress(info.PeerName); + TString userSID = *info.UserSID.OrElse(EmptyValue); + TString startTime = (info.StartTime != TInstant::Zero() ? info.StartTime.ToString() : TString()); + TString endTime = (info.EndTime != TInstant::Zero() ? info.EndTime.ToString() : TString()); + + // Info.State can't be anything but Done or Cancelled here + Y_ABORT_UNLESS(info.State == Info::EState::Done || info.State == Info::EState::Cancelled); + TString status = TString(info.State == Info::EState::Done ? "SUCCESS" : "ERROR"); + Ydb::StatusIds::StatusCode detailedStatus = (info.State == Info::EState::Done ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::CANCELLED); + + AuditLogXxport({ + .OperationName = operationName, + .Id = info.Id, + .Uid = info.Uid, + .RemoteAddress = peerName, + .UserSID = userSID, + .DatabasePath = databasePath.PathString(), + .Status = status, + .DetailedStatus = detailedStatus, + .Reason = info.Issue, + + .AdditionalParts = std::move(additionalParts), + + .StartTime = startTime, + .EndTime = endTime, + + .CloudId = cloud_id, + .FolderId = folder_id, + .ResourceId = database_id, + }); +} + +void AuditLogExportEnd(const TExportInfo& info, TSchemeShard* SS) { + NKikimrExport::TCreateExportRequest proto; + // TSchemeShard::FromXxportInfo() can not be used here + switch (info.Kind) { + case TExportInfo::EKind::YT: + Y_ABORT_UNLESS(proto.MutableExportToYtSettings()->ParseFromString(info.Settings)); + proto.MutableExportToYtSettings()->clear_token(); + break; + case TExportInfo::EKind::S3: + Y_ABORT_UNLESS(proto.MutableExportToS3Settings()->ParseFromString(info.Settings)); + proto.MutableExportToS3Settings()->clear_access_key(); + proto.MutableExportToS3Settings()->clear_secret_key(); + break; + } + _AuditLogXxportEnd(info, "EXPORT END", ExportKindSpecificParts(proto), SS); +} +void AuditLogImportEnd(const TImportInfo& info, TSchemeShard* SS) { + _AuditLogXxportEnd(info, "IMPORT END", ImportKindSpecificParts(info.Settings), SS); +} + void AuditLogLogin(const NKikimrScheme::TEvLogin& request, const NKikimrScheme::TEvLoginResult& response, TSchemeShard* SS) { static const TString LoginOperationName = "LOGIN"; diff --git a/ydb/core/tx/schemeshard/schemeshard_audit_log.h b/ydb/core/tx/schemeshard/schemeshard_audit_log.h index ca5918880aca..1053823b4d00 100644 --- a/ydb/core/tx/schemeshard/schemeshard_audit_log.h +++ b/ydb/core/tx/schemeshard/schemeshard_audit_log.h @@ -10,12 +10,30 @@ class TEvLogin; class TEvLoginResult; } +namespace NKikimrExport { +class TEvCreateExportRequest; +class TEvCreateExportResponse; +} + +namespace NKikimrImport { +class TEvCreateImportRequest; +class TEvCreateImportResponse; +} + namespace NKikimr::NSchemeShard { class TSchemeShard; +struct TExportInfo; +struct TImportInfo; void AuditLogModifySchemeTransaction(const NKikimrScheme::TEvModifySchemeTransaction& request, const NKikimrScheme::TEvModifySchemeTransactionResult& response, TSchemeShard* SS, const TString& userSID); void AuditLogModifySchemeTransactionDeprecated(const NKikimrScheme::TEvModifySchemeTransaction& request, const NKikimrScheme::TEvModifySchemeTransactionResult& response, TSchemeShard* SS, const TString& userSID); +void AuditLogExportStart(const NKikimrExport::TEvCreateExportRequest& request, const NKikimrExport::TEvCreateExportResponse& response, TSchemeShard* SS); +void AuditLogExportEnd(const TExportInfo& exportInfo, TSchemeShard* SS); + +void AuditLogImportStart(const NKikimrImport::TEvCreateImportRequest& request, const NKikimrImport::TEvCreateImportResponse& response, TSchemeShard* SS); +void AuditLogImportEnd(const TImportInfo& importInfo, TSchemeShard* SS); + void AuditLogLogin(const NKikimrScheme::TEvLogin& request, const NKikimrScheme::TEvLoginResult& response, TSchemeShard* SS); } diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp index 6021cbe67056..450db67f8e46 100644 --- a/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_build_index__create.cpp @@ -1,4 +1,5 @@ #include "schemeshard_build_index.h" +#include "schemeshard_xxport__helpers.h" #include "schemeshard_build_index_helpers.h" #include "schemeshard_build_index_tx_base.h" #include "schemeshard_impl.h" @@ -29,7 +30,7 @@ class TSchemeShard::TIndexBuilder::TTxCreate: public TSchemeShard::TIndexBuilder << "Index build with id '" << id << "' already exists"); } - const TString& uid = GetUid(request.GetOperationParams().labels()); + const TString& uid = GetUid(request.GetOperationParams()); if (uid && Self->IndexBuildsByUid.contains(uid)) { return Reply(Ydb::StatusIds::ALREADY_EXISTS, TStringBuilder() << "Index build with uid '" << uid << "' already exists"); @@ -212,7 +213,7 @@ class TSchemeShard::TIndexBuilder::TTxCreate: public TSchemeShard::TIndexBuilder if (settings.has_index() && settings.has_column_build_operation()) { explain = "unable to build index and column in the single operation"; - return false; + return false; } if (settings.has_index()) { @@ -240,19 +241,10 @@ class TSchemeShard::TIndexBuilder::TTxCreate: public TSchemeShard::TIndexBuilder Ydb::StatusIds::StatusCode status; if (!FillIndexTablePartitioning(buildInfo->ImplTableDescription, index, status, explain)) { return false; - } + } } return true; } - - static TString GetUid(const google::protobuf::Map& labels) { - auto it = labels.find("uid"); - if (it == labels.end()) { - return TString(); - } - - return it->second; - } }; ITransaction* TSchemeShard::CreateTxCreate(TEvIndexBuilder::TEvCreateRequest::TPtr& ev) { diff --git a/ydb/core/tx/schemeshard/schemeshard_export.cpp b/ydb/core/tx/schemeshard/schemeshard_export.cpp index 2e0aa11030e4..a8ae8e67d9a6 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export.cpp @@ -83,7 +83,7 @@ namespace { void TSchemeShard::FromXxportInfo(NKikimrExport::TExport& exprt, const TExportInfo::TPtr exportInfo) { exprt.SetId(exportInfo->Id); exprt.SetStatus(Ydb::StatusIds::SUCCESS); - + if (exportInfo->StartTime != TInstant::Zero()) { *exprt.MutableStartTime() = SecondsToProtoTimeStamp(exportInfo->StartTime.Seconds()); } diff --git a/ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp b/ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp index 531168da9275..ccc6146e269d 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__cancel.cpp @@ -1,6 +1,7 @@ #include "schemeshard_xxport__tx_base.h" #include "schemeshard_export_flow_proposals.h" #include "schemeshard_export.h" +#include "schemeshard_audit_log.h" #include "schemeshard_impl.h" #include @@ -88,6 +89,11 @@ struct TSchemeShard::TExport::TTxCancel: public TSchemeShard::TXxport::TTxBase { Send(Request->Sender, std::move(response), 0, Request->Cookie); SendNotificationsIfFinished(exportInfo); + + if (exportInfo->IsFinished()) { + AuditLogExportEnd(*exportInfo.Get(), Self); + } + return true; } @@ -167,6 +173,11 @@ struct TSchemeShard::TExport::TTxCancelAck: public TSchemeShard::TXxport::TTxBas } SendNotificationsIfFinished(exportInfo); + + if (exportInfo->IsFinished()) { + AuditLogExportEnd(*exportInfo.Get(), Self); + } + return true; } diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index dc822ca86d5b..f65eabb43f89 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -1,7 +1,9 @@ #include "schemeshard_xxport__tx_base.h" +#include "schemeshard_xxport__helpers.h" #include "schemeshard_export_flow_proposals.h" #include "schemeshard_export_helpers.h" #include "schemeshard_export.h" +#include "schemeshard_audit_log.h" #include "schemeshard_impl.h" #include @@ -50,7 +52,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { ); } - const TString& uid = GetUid(request.GetRequest().GetOperationParams().labels()); + const TString& uid = GetUid(request.GetRequest().GetOperationParams()); if (uid) { if (auto it = Self->ExportsByUid.find(uid); it != Self->ExportsByUid.end()) { if (IsSameDomain(it->second, request.GetDatabaseName())) { @@ -95,7 +97,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { case NKikimrExport::TCreateExportRequest::kExportToYtSettings: { const auto& settings = request.GetRequest().GetExportToYtSettings(); - exportInfo = new TExportInfo(id, uid, TExportInfo::EKind::YT, settings, domainPath.Base()->PathId); + exportInfo = new TExportInfo(id, uid, TExportInfo::EKind::YT, settings, domainPath.Base()->PathId, request.GetPeerName()); TString explain; if (!FillItems(exportInfo, settings, explain)) { @@ -115,7 +117,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { settings.set_scheme(Ydb::Export::ExportToS3Settings::HTTPS); } - exportInfo = new TExportInfo(id, uid, TExportInfo::EKind::S3, settings, domainPath.Base()->PathId); + exportInfo = new TExportInfo(id, uid, TExportInfo::EKind::S3, settings, domainPath.Base()->PathId, request.GetPeerName()); TString explain; if (!FillItems(exportInfo, settings, explain)) { @@ -166,15 +168,6 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { } private: - static TString GetUid(const google::protobuf::Map& labels) { - auto it = labels.find("uid"); - if (it == labels.end()) { - return TString(); - } - - return it->second; - } - bool Reply( THolder response, const Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS, @@ -191,6 +184,8 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { AddIssue(exprt, errorMessage); } + AuditLogExportStart(Request->Get()->Record, response->Record, Self); + Send(Request->Sender, std::move(response), 0, Request->Cookie); return true; @@ -895,7 +890,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase OnNotifyResult(txId, id, itemIdx, txc); Self->TxIdToExport.erase(txId); } - + if (Self->TxIdToDependentExport.contains(txId)) { for (const auto id : Self->TxIdToDependentExport.at(txId)) { OnNotifyResult(txId, id, Max(), txc); @@ -997,6 +992,10 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase Self->PersistExportState(db, exportInfo); SendNotificationsIfFinished(exportInfo); + + if (exportInfo->IsFinished()) { + AuditLogExportEnd(*exportInfo.Get(), Self); + } } }; // TTxProgress diff --git a/ydb/core/tx/schemeshard/schemeshard_import__cancel.cpp b/ydb/core/tx/schemeshard/schemeshard_import__cancel.cpp index 0b7bcf62ffb0..d494c514c3df 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__cancel.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__cancel.cpp @@ -1,6 +1,7 @@ #include "schemeshard_xxport__tx_base.h" #include "schemeshard_import_flow_proposals.h" #include "schemeshard_import.h" +#include "schemeshard_audit_log.h" #include "schemeshard_impl.h" #include @@ -91,6 +92,11 @@ struct TSchemeShard::TImport::TTxCancel: public TSchemeShard::TXxport::TTxBase { Self->PersistImportState(db, importInfo); SendNotificationsIfFinished(importInfo); + + if (importInfo->IsFinished()) { + AuditLogImportEnd(*importInfo.Get(), Self); + } + return respond(Ydb::StatusIds::SUCCESS); default: @@ -191,6 +197,11 @@ struct TSchemeShard::TImport::TTxCancelAck: public TSchemeShard::TXxport::TTxBas Self->PersistImportState(db, importInfo); SendNotificationsIfFinished(importInfo); + + if (importInfo->IsFinished()) { + AuditLogImportEnd(*importInfo.Get(), Self); + } + return true; } diff --git a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp index 25a66878d895..2ddf4c3051b6 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import__create.cpp @@ -1,8 +1,10 @@ #include "schemeshard_xxport__tx_base.h" +#include "schemeshard_xxport__helpers.h" #include "schemeshard_import_flow_proposals.h" #include "schemeshard_import_scheme_getter.h" #include "schemeshard_import_helpers.h" #include "schemeshard_import.h" +#include "schemeshard_audit_log.h" #include "schemeshard_impl.h" #include @@ -52,7 +54,7 @@ struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase { ); } - const TString& uid = GetUid(request.GetRequest().GetOperationParams().labels()); + const TString& uid = GetUid(request.GetRequest().GetOperationParams()); if (uid) { if (auto it = Self->ImportsByUid.find(uid); it != Self->ImportsByUid.end()) { if (IsSameDomain(it->second, request.GetDatabaseName())) { @@ -101,7 +103,7 @@ struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase { settings.set_scheme(Ydb::Import::ImportFromS3Settings::HTTPS); } - importInfo = new TImportInfo(id, uid, TImportInfo::EKind::S3, settings, domainPath.Base()->PathId); + importInfo = new TImportInfo(id, uid, TImportInfo::EKind::S3, settings, domainPath.Base()->PathId, request.GetPeerName()); if (request.HasUserSID()) { importInfo->UserSID = request.GetUserSID(); @@ -148,15 +150,6 @@ struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase { } private: - static TString GetUid(const google::protobuf::Map& labels) { - auto it = labels.find("uid"); - if (it == labels.end()) { - return TString(); - } - - return it->second; - } - bool Reply( THolder response, const Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS, @@ -173,6 +166,8 @@ struct TSchemeShard::TImport::TTxCreate: public TSchemeShard::TXxport::TTxBase { AddIssue(entry, errorMessage); } + AuditLogImportStart(Request->Get()->Record, response->Record, Self); + Send(Request->Sender, std::move(response), 0, Request->Cookie); return true; @@ -1017,6 +1012,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase Self->PersistImportState(db, importInfo); SendNotificationsIfFinished(importInfo); + + if (importInfo->IsFinished()) { + AuditLogImportEnd(*importInfo.Get(), Self); + } } }; // TTxProgress diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index a13ef59e4de1..9ef8caa4a26d 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -2018,6 +2018,7 @@ TString TExportInfo::ToString() const { << " DomainPathId: " << DomainPathId << " ExportPathId: " << ExportPathId << " UserSID: '" << UserSID << "'" + << " PeerName: '" << PeerName << "'" << " State: " << State << " WaitTxId: " << WaitTxId << " Issue: '" << Issue << "'" diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 4a27cfa2bae1..279ad7fee624 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2608,12 +2608,13 @@ struct TExportInfo: public TSimpleRefCount { static bool IsDropped(const TItem& item); }; - ui64 Id; + ui64 Id; // TxId from the original TEvCreateExportRequest TString Uid; EKind Kind; TString Settings; TPathId DomainPathId; TMaybe UserSID; + TString PeerName; // required for making audit log records TVector Items; TPathId ExportPathId = InvalidPathId; @@ -2638,12 +2639,14 @@ struct TExportInfo: public TSimpleRefCount { const TString& uid, const EKind kind, const TString& settings, - const TPathId domainPathId) + const TPathId domainPathId, + const TString& peerName) : Id(id) , Uid(uid) , Kind(kind) , Settings(settings) , DomainPathId(domainPathId) + , PeerName(peerName) { } @@ -2653,8 +2656,9 @@ struct TExportInfo: public TSimpleRefCount { const TString& uid, const EKind kind, const TSettingsPB& settingsPb, - const TPathId domainPathId) - : TExportInfo(id, uid, kind, SerializeSettings(settingsPb), domainPathId) + const TPathId domainPathId, + const TString& peerName) + : TExportInfo(id, uid, kind, SerializeSettings(settingsPb), domainPathId, peerName) { } @@ -2765,12 +2769,13 @@ struct TImportInfo: public TSimpleRefCount { static bool IsDone(const TItem& item); }; - ui64 Id; + ui64 Id; // TxId from the original TEvCreateImportRequest TString Uid; EKind Kind; Ydb::Import::ImportFromS3Settings Settings; TPathId DomainPathId; TMaybe UserSID; + TString PeerName; // required for making audit log records EState State = EState::Invalid; TString Issue; @@ -2786,12 +2791,14 @@ struct TImportInfo: public TSimpleRefCount { const TString& uid, const EKind kind, const Ydb::Import::ImportFromS3Settings& settings, - const TPathId domainPathId) + const TPathId domainPathId, + const TString& peerName) : Id(id) , Uid(uid) , Kind(kind) , Settings(settings) , DomainPathId(domainPathId) + , PeerName(peerName) { } diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index ac7cce9805c8..d36c6d4f6532 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1160,6 +1160,7 @@ struct Schema : NIceDb::Schema { struct StartTime : Column<14, NScheme::NTypeIds::Uint64> {}; struct EndTime : Column<15, NScheme::NTypeIds::Uint64> {}; + struct PeerName : Column<16, NScheme::NTypeIds::Utf8> {}; using TKey = TableKey; using TColumns = TableColumns< @@ -1177,7 +1178,8 @@ struct Schema : NIceDb::Schema { Kind, UserSID, StartTime, - EndTime + EndTime, + PeerName >; }; @@ -1482,6 +1484,7 @@ struct Schema : NIceDb::Schema { struct StartTime : Column<11, NScheme::NTypeIds::Uint64> {}; struct EndTime : Column<12, NScheme::NTypeIds::Uint64> {}; + struct PeerName : Column<13, NScheme::NTypeIds::Utf8> {}; using TKey = TableKey; using TColumns = TableColumns< @@ -1496,7 +1499,8 @@ struct Schema : NIceDb::Schema { Issue, UserSID, StartTime, - EndTime + EndTime, + PeerName >; }; diff --git a/ydb/core/tx/schemeshard/schemeshard_xxport__helpers.cpp b/ydb/core/tx/schemeshard/schemeshard_xxport__helpers.cpp new file mode 100644 index 000000000000..8990964b78d3 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_xxport__helpers.cpp @@ -0,0 +1,14 @@ +#include + +namespace NKikimr::NSchemeShard { + +TString GetUid(const Ydb::Operations::OperationParams& operationParams) { + const auto& labels = operationParams.labels(); + auto it = labels.find("uid"); + if (it != labels.end()) { + return it->second; + } + return {}; +} + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/schemeshard_xxport__helpers.h b/ydb/core/tx/schemeshard/schemeshard_xxport__helpers.h new file mode 100644 index 000000000000..e6fafda10fb0 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_xxport__helpers.h @@ -0,0 +1,13 @@ +#pragma once + +#include + +namespace Ydb::Operations { + class OperationParams; +} + +namespace NKikimr::NSchemeShard { + +TString GetUid(const Ydb::Operations::OperationParams& operationParams); + +} // NKikimr::NSchemeShard diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp index b33027c91af7..f2e82ce2c7d7 100644 --- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -19,7 +20,7 @@ namespace { void Run(TTestBasicRuntime& runtime, TTestEnv& env, const TVector& tables, const TString& request, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS, - const TString& dbName = "/MyRoot", bool serverless = false, const TString& userSID = "") { + const TString& dbName = "/MyRoot", bool serverless = false, const TString& userSID = "", const TString& peerName = "") { ui64 txId = 100; @@ -104,7 +105,7 @@ namespace { const auto initialStatus = expectedStatus == Ydb::StatusIds::PRECONDITION_FAILED ? expectedStatus : Ydb::StatusIds::SUCCESS; - TestExport(runtime, schemeshardId, ++txId, dbName, request, userSID, initialStatus); + TestExport(runtime, schemeshardId, ++txId, dbName, request, userSID, peerName, initialStatus); env.TestWaitNotification(runtime, txId, schemeshardId); if (initialStatus != Ydb::StatusIds::SUCCESS) { @@ -124,6 +125,9 @@ namespace { void Cancel(const TVector& tables, const TString& request, TDelayFunc delayFunc) { TTestBasicRuntime runtime; + std::vector auditLines; + runtime.AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines)); + TTestEnv env(runtime); ui64 txId = 100; @@ -147,6 +151,22 @@ namespace { TestExport(runtime, ++txId, "/MyRoot", request); const ui64 exportId = txId; + // Check audit record for export start + { + auto line = FindAuditLine(auditLines, "operation=EXPORT START"); + UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard"); + UNIT_ASSERT_STRING_CONTAINS(line, "operation=EXPORT START"); + UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", exportId)); + UNIT_ASSERT_STRING_CONTAINS(line, "remote_address="); // can't check the value + UNIT_ASSERT_STRING_CONTAINS(line, "subject={none}"); + UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot"); + UNIT_ASSERT_STRING_CONTAINS(line, "status=SUCCESS"); + UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=SUCCESS"); + UNIT_ASSERT(!line.contains("reason")); + UNIT_ASSERT(!line.contains("start_time")); + UNIT_ASSERT(!line.contains("end_time")); + } + if (!delayed) { TDispatchOptions opts; opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool { @@ -161,6 +181,23 @@ namespace { runtime.Send(delayed.Release(), 0, true); env.TestWaitNotification(runtime, exportId); + // Check audit record for export end + // + { + auto line = FindAuditLine(auditLines, "operation=EXPORT END"); + UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard"); + UNIT_ASSERT_STRING_CONTAINS(line, "operation=EXPORT END"); + UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", exportId)); + UNIT_ASSERT_STRING_CONTAINS(line, "remote_address="); // can't check the value + UNIT_ASSERT_STRING_CONTAINS(line, "subject={none}"); + UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot"); + UNIT_ASSERT_STRING_CONTAINS(line, "status=ERROR"); + UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=CANCELLED"); + UNIT_ASSERT_STRING_CONTAINS(line, "reason=Cancelled"); + UNIT_ASSERT_STRING_CONTAINS(line, "start_time="); + UNIT_ASSERT_STRING_CONTAINS(line, "end_time="); + } + TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::CANCELLED); TestForgetExport(runtime, ++txId, "/MyRoot", exportId); @@ -1708,6 +1745,235 @@ partitioning_settings { UNIT_ASSERT_LT(entry.GetStartTime().seconds(), entry.GetEndTime().seconds()); } + // Based on CompletedExportEndTime + Y_UNIT_TEST(AuditCompletedExport) { + TTestBasicRuntime runtime; + std::vector auditLines; + runtime.AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines)); + + TTestEnv env(runtime); + + runtime.UpdateCurrentTime(TInstant::Now()); + ui64 txId = 100; + + // Prepare table to export + // + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + // Start export + // + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + const auto request = Sprintf(R"( + OperationParams { + labels { + key: "uid" + value: "foo" + } + } + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "" + } + } + )", port); + TestExport(runtime, ++txId, "/MyRoot", request, /*userSID*/ "user@builtin", /*peerName*/ "127.0.0.1:9876"); + + // Check audit record for export start + { + auto line = FindAuditLine(auditLines, "operation=EXPORT START"); + UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard"); + UNIT_ASSERT_STRING_CONTAINS(line, "operation=EXPORT START"); + UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", txId)); + UNIT_ASSERT_STRING_CONTAINS(line, "uid=foo"); + UNIT_ASSERT_STRING_CONTAINS(line, "remote_address=127.0.0.1"); + UNIT_ASSERT_STRING_CONTAINS(line, "subject=user@builtin"); + UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot"); + UNIT_ASSERT_STRING_CONTAINS(line, "status=SUCCESS"); + UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=SUCCESS"); + UNIT_ASSERT(!line.contains("reason")); + UNIT_ASSERT(!line.contains("start_time")); + UNIT_ASSERT(!line.contains("end_time")); + } + + // Do export + // + runtime.AdvanceCurrentTime(TDuration::Seconds(30)); + + env.TestWaitNotification(runtime, txId); + + const auto desc = TestGetExport(runtime, txId, "/MyRoot"); + const auto& entry = desc.GetResponse().GetEntry(); + UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_DONE); + UNIT_ASSERT(entry.HasStartTime()); + UNIT_ASSERT(entry.HasEndTime()); + UNIT_ASSERT_LT(entry.GetStartTime().seconds(), entry.GetEndTime().seconds()); + + // Check audit record for export end + // + { + auto line = FindAuditLine(auditLines, "operation=EXPORT END"); + UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard"); + UNIT_ASSERT_STRING_CONTAINS(line, "operation=EXPORT END"); + UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", txId)); + UNIT_ASSERT_STRING_CONTAINS(line, "remote_address=127.0.0.1"); + UNIT_ASSERT_STRING_CONTAINS(line, "subject=user@builtin"); + UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot"); + UNIT_ASSERT_STRING_CONTAINS(line, "status=SUCCESS"); + UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=SUCCESS"); + UNIT_ASSERT(!line.contains("reason")); + UNIT_ASSERT_STRING_CONTAINS(line, "start_time="); + UNIT_ASSERT_STRING_CONTAINS(line, "end_time="); + } + } + + Y_UNIT_TEST(AuditCancelledExport) { + TTestBasicRuntime runtime; + std::vector auditLines; + runtime.AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines)); + + TTestEnv env(runtime); + + runtime.UpdateCurrentTime(TInstant::Now()); + ui64 txId = 100; + + // Prepare table to export + // + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + auto delayFunc = [](TAutoPtr& ev) { + if (ev->GetTypeRewrite() != TEvSchemeShard::EvModifySchemeTransaction) { + return false; + } + + return ev->Get()->Record + .GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpBackup; + }; + + THolder delayed; + auto prevObserver = runtime.SetObserverFunc([&](TAutoPtr& ev) { + if (delayFunc(ev)) { + delayed.Reset(ev.Release()); + return TTestActorRuntime::EEventAction::DROP; + } + return TTestActorRuntime::EEventAction::PROCESS; + }); + + // Start export + // + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + const auto request = Sprintf(R"( + OperationParams { + labels { + key: "uid" + value: "foo" + } + } + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "" + } + } + )", port); + TestExport(runtime, ++txId, "/MyRoot", request, /*userSID*/ "user@builtin", /*peerName*/ "127.0.0.1:9876"); + const ui64 exportId = txId; + + // Check audit record for export start + { + auto line = FindAuditLine(auditLines, "operation=EXPORT START"); + UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard"); + UNIT_ASSERT_STRING_CONTAINS(line, "operation=EXPORT START"); + UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", exportId)); + UNIT_ASSERT_STRING_CONTAINS(line, "uid=foo"); + UNIT_ASSERT_STRING_CONTAINS(line, "remote_address=127.0.0.1"); + UNIT_ASSERT_STRING_CONTAINS(line, "subject=user@builtin"); + UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot"); + UNIT_ASSERT_STRING_CONTAINS(line, "status=SUCCESS"); + UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=SUCCESS"); + UNIT_ASSERT(!line.contains("reason")); + UNIT_ASSERT(!line.contains("start_time")); + UNIT_ASSERT(!line.contains("end_time")); + } + + // Do export (unsuccessfully) + // + runtime.AdvanceCurrentTime(TDuration::Seconds(30)); + + if (!delayed) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&delayed](IEventHandle&) -> bool { + return bool(delayed); + }); + runtime.DispatchEvents(opts); + } + runtime.SetObserverFunc(prevObserver); + + // Cancel export mid-air + // + TestCancelExport(runtime, ++txId, "/MyRoot", exportId); + + auto desc = TestGetExport(runtime, exportId, "/MyRoot"); + auto entry = desc.GetResponse().GetEntry(); + UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_CANCELLATION); + UNIT_ASSERT(entry.HasStartTime()); + UNIT_ASSERT(!entry.HasEndTime()); + + runtime.Send(delayed.Release(), 0, true); + env.TestWaitNotification(runtime, exportId); + + desc = TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::CANCELLED); + entry = desc.GetResponse().GetEntry(); + UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Export::ExportProgress::PROGRESS_CANCELLED); + UNIT_ASSERT(entry.HasStartTime()); + UNIT_ASSERT(entry.HasEndTime()); + UNIT_ASSERT_LT(entry.GetStartTime().seconds(), entry.GetEndTime().seconds()); + + // Check audit record for export end + // + { + auto line = FindAuditLine(auditLines, "operation=EXPORT END"); + UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard"); + UNIT_ASSERT_STRING_CONTAINS(line, "operation=EXPORT END"); + UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", exportId)); + UNIT_ASSERT_STRING_CONTAINS(line, "uid=foo"); + UNIT_ASSERT_STRING_CONTAINS(line, "remote_address=127.0.0.1"); // can't check the value + UNIT_ASSERT_STRING_CONTAINS(line, "subject=user@builtin"); + UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot"); + UNIT_ASSERT_STRING_CONTAINS(line, "status=ERROR"); + UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=CANCELLED"); + UNIT_ASSERT_STRING_CONTAINS(line, "reason=Cancelled"); + UNIT_ASSERT_STRING_CONTAINS(line, "start_time="); + UNIT_ASSERT_STRING_CONTAINS(line, "end_time="); + } + } + Y_UNIT_TEST(ExportPartitioningSettings) { TPortManager portManager; const ui16 port = portManager.GetPort(); diff --git a/ydb/core/tx/schemeshard/ut_helpers/auditlog_helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/auditlog_helpers.cpp new file mode 100644 index 000000000000..9826eafc523a --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_helpers/auditlog_helpers.cpp @@ -0,0 +1,53 @@ +#include +#include +#include + +#include +#include +#include + +#include + +#include "auditlog_helpers.h" + +namespace NSchemeShardUT_Private { + +namespace { + +class TMemoryLogBackend: public TLogBackend { +public: + std::vector& Buffer; + + TMemoryLogBackend(std::vector& buffer) + : Buffer(buffer) + {} + + virtual void WriteData(const TLogRecord& rec) override { + Buffer.emplace_back(rec.Data, rec.Len); + } + + virtual void ReopenLog() override { + } +}; + +} // anonymous namespace + +NAudit::TAuditLogBackends CreateTestAuditLogBackends(std::vector& lineBuffer) { + NAudit::TAuditLogBackends logBackends; + logBackends[NKikimrConfig::TAuditConfig::TXT].emplace_back(new TMemoryLogBackend(lineBuffer)); + return logBackends; +} + +std::string FindAuditLine(const std::vector& auditLines, const std::string& substr) { + Cerr << "AUDIT LOG buffer(" << auditLines.size() << "):" << Endl; + for (auto i : auditLines) { + Cerr << " " << i << Endl; + } + auto found = std::find_if(auditLines.begin(), auditLines.end(), [&](auto i) { return i.contains(substr); }); + UNIT_ASSERT_C(found != auditLines.end(), "No audit record with substring: '" + substr + "'"); + auto line = *found; + Cerr << "AUDIT LOG checked line:" << Endl << " " << line << Endl; + return line; +} + +} // namespace NSchemeShardUT_Private diff --git a/ydb/core/tx/schemeshard/ut_helpers/auditlog_helpers.h b/ydb/core/tx/schemeshard/ut_helpers/auditlog_helpers.h new file mode 100644 index 000000000000..b7d57ba3cb70 --- /dev/null +++ b/ydb/core/tx/schemeshard/ut_helpers/auditlog_helpers.h @@ -0,0 +1,16 @@ +#pragma once + +#include +#include + +#include + +namespace NSchemeShardUT_Private { + +using namespace NKikimr; + +NAudit::TAuditLogBackends CreateTestAuditLogBackends(std::vector& lineBuffer); + +std::string FindAuditLine(const std::vector& auditLines, const std::string& substr); + +} // namespace NSchemeShardUT_Private diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp index 0fe031eb3cf8..5886728811f3 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp @@ -1026,7 +1026,7 @@ namespace NSchemeShardUT_Private { return result; } - void AsyncExport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID) { + void AsyncExport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID, const TString& peerName) { NKikimrExport::TCreateExportRequest request; UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(requestStr, &request)); @@ -1044,26 +1044,29 @@ namespace NSchemeShardUT_Private { if (userSID) { ev->Record.SetUserSID(userSID); } + if (peerName) { + ev->Record.SetPeerName(peerName); + } AsyncSend(runtime, schemeshardId, ev.Release()); } - void AsyncExport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID) { - AsyncExport(runtime, TTestTxConfig::SchemeShard, id, dbName, requestStr, userSID); + void AsyncExport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID, const TString& peerName) { + AsyncExport(runtime, TTestTxConfig::SchemeShard, id, dbName, requestStr, userSID, peerName); } - void TestExport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID, + void TestExport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID, const TString& peerName, Ydb::StatusIds::StatusCode expectedStatus) { - AsyncExport(runtime, schemeshardId, id, dbName, requestStr, userSID); + AsyncExport(runtime, schemeshardId, id, dbName, requestStr, userSID, peerName); TAutoPtr handle; auto ev = runtime.GrabEdgeEvent(handle); UNIT_ASSERT_EQUAL(ev->Record.GetResponse().GetEntry().GetStatus(), expectedStatus); } - void TestExport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID, + void TestExport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID, const TString& peerName, Ydb::StatusIds::StatusCode expectedStatus) { - TestExport(runtime, TTestTxConfig::SchemeShard, id, dbName, requestStr, userSID, expectedStatus); + TestExport(runtime, TTestTxConfig::SchemeShard, id, dbName, requestStr, userSID, peerName, expectedStatus); } NKikimrExport::TEvGetExportResponse TestGetExport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, @@ -1152,7 +1155,7 @@ namespace NSchemeShardUT_Private { return TestForgetExport(runtime, TTestTxConfig::SchemeShard, txId, dbName, exportId, expectedStatus); } - void AsyncImport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID) { + void AsyncImport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID, const TString& peerName) { NKikimrImport::TCreateImportRequest request; UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(requestStr, &request)); @@ -1160,26 +1163,29 @@ namespace NSchemeShardUT_Private { if (userSID) { ev->Record.SetUserSID(userSID); } + if (peerName) { + ev->Record.SetPeerName(peerName); + } AsyncSend(runtime, schemeshardId, ev.Release()); } - void AsyncImport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID) { - AsyncImport(runtime, TTestTxConfig::SchemeShard, id, dbName, requestStr, userSID); + void AsyncImport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID, const TString& peerName) { + AsyncImport(runtime, TTestTxConfig::SchemeShard, id, dbName, requestStr, userSID, peerName); } - void TestImport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID, + void TestImport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID, const TString& peerName, Ydb::StatusIds::StatusCode expectedStatus) { - AsyncImport(runtime, schemeshardId, id, dbName, requestStr, userSID); + AsyncImport(runtime, schemeshardId, id, dbName, requestStr, userSID, peerName); TAutoPtr handle; auto ev = runtime.GrabEdgeEvent(handle); UNIT_ASSERT_EQUAL(ev->Record.GetResponse().GetEntry().GetStatus(), expectedStatus); } - void TestImport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID, + void TestImport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID, const TString& peerName, Ydb::StatusIds::StatusCode expectedStatus) { - TestImport(runtime, TTestTxConfig::SchemeShard, id, dbName, requestStr, userSID, expectedStatus); + TestImport(runtime, TTestTxConfig::SchemeShard, id, dbName, requestStr, userSID, peerName, expectedStatus); } NKikimrImport::TEvGetImportResponse TestGetImport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h index e59f1d3fe07d..94222eaaaa10 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h @@ -370,11 +370,11 @@ namespace NSchemeShardUT_Private { ////////// export TVector GetExportTargetPaths(const TString& requestStr); - void AsyncExport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = ""); - void AsyncExport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = ""); - void TestExport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = "", + void AsyncExport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = "", const TString& peerName = ""); + void AsyncExport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = "", const TString& peerName = ""); + void TestExport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = "", const TString& peerName = "", Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS); - void TestExport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = "", + void TestExport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = "", const TString& peerName = "", Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS); NKikimrExport::TEvGetExportResponse TestGetExport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TVector& expectedStatuses); @@ -398,11 +398,11 @@ namespace NSchemeShardUT_Private { Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS); ////////// import - void AsyncImport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = ""); - void AsyncImport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = ""); - void TestImport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = "", + void AsyncImport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = "", const TString& peerName = ""); + void AsyncImport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = "", const TString& peerName = ""); + void TestImport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = "", const TString& peerName = "", Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS); - void TestImport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = "", + void TestImport(TTestActorRuntime& runtime, ui64 id, const TString& dbName, const TString& requestStr, const TString& userSID = "", const TString& peerName = "", Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS); NKikimrImport::TEvGetImportResponse TestGetImport(TTestActorRuntime& runtime, ui64 schemeshardId, ui64 id, const TString& dbName, const TVector& expectedStatuses); diff --git a/ydb/core/tx/schemeshard/ut_helpers/ya.make b/ydb/core/tx/schemeshard/ut_helpers/ya.make index 89f867d6c154..353849a5f51a 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/ya.make +++ b/ydb/core/tx/schemeshard/ut_helpers/ya.make @@ -25,6 +25,8 @@ PEERDIR( ) SRCS( + auditlog_helpers.cpp + auditlog_helpers.h export_reboots_common.cpp failing_mtpq.cpp helpers.cpp diff --git a/ydb/core/tx/schemeshard/ut_login/ut_login.cpp b/ydb/core/tx/schemeshard/ut_login/ut_login.cpp index 6c872c263cce..1401936c99f7 100644 --- a/ydb/core/tx/schemeshard/ut_login/ut_login.cpp +++ b/ydb/core/tx/schemeshard/ut_login/ut_login.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include @@ -23,26 +24,6 @@ void TestCreateAlterLoginCreateUser(TTestActorRuntime& runtime, ui64 txId, const } // namespace NSchemeShardUT_Private -namespace { - -class TMemoryLogBackend: public TLogBackend { -public: - std::vector& Buffer; - - TMemoryLogBackend(std::vector& buffer) - : Buffer(buffer) - {} - - virtual void WriteData(const TLogRecord& rec) override { - Buffer.emplace_back(rec.Data, rec.Len); - } - - virtual void ReopenLog() override { - } -}; - -} // anonymous namespace - Y_UNIT_TEST_SUITE(TSchemeShardLoginTest) { Y_UNIT_TEST(BasicLogin) { TTestBasicRuntime runtime; @@ -83,12 +64,6 @@ Y_UNIT_TEST_SUITE(TSchemeShardLoginTest) { UNIT_ASSERT(describe.GetPathDescription().GetDomainDescription().GetSecurityState().PublicKeysSize() > 0); } - NAudit::TAuditLogBackends CreateTestAuditLogBackends(std::vector& buffer) { - NAudit::TAuditLogBackends logBackends; - logBackends[NKikimrConfig::TAuditConfig::TXT].emplace_back(new TMemoryLogBackend(buffer)); - return logBackends; - } - Y_UNIT_TEST(AuditLogLoginSuccess) { TTestBasicRuntime runtime; std::vector lines; @@ -109,10 +84,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardLoginTest) { } UNIT_ASSERT_VALUES_EQUAL(lines.size(), 3); // +user login - Cerr << "auditlog lines:\n" << JoinSeq('\n', lines) << Endl; - auto last = lines[lines.size() - 1]; - Cerr << "auditlog last line:\n" << last << Endl; - + auto last = FindAuditLine(lines, "operation=LOGIN"); UNIT_ASSERT_STRING_CONTAINS(last, "component=schemeshard"); UNIT_ASSERT_STRING_CONTAINS(last, "remote_address="); // can't check the value UNIT_ASSERT_STRING_CONTAINS(last, "database=/MyRoot"); @@ -143,10 +115,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardLoginTest) { } UNIT_ASSERT_VALUES_EQUAL(lines.size(), 3); // +user login - Cerr << "auditlog lines:\n" << JoinSeq('\n', lines) << Endl; - auto last = lines[lines.size() - 1]; - Cerr << "auditlog last line:\n" << last << Endl; - + auto last = FindAuditLine(lines, "operation=LOGIN"); UNIT_ASSERT_STRING_CONTAINS(last, "component=schemeshard"); UNIT_ASSERT_STRING_CONTAINS(last, "remote_address="); // can't check the value UNIT_ASSERT_STRING_CONTAINS(last, "database=/MyRoot"); diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp index d9a43c4c4473..cee6f82143e7 100644 --- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -1206,7 +1207,7 @@ value { const TVector keyTags = {1}; TVector valueTags(values.size()); - std::iota(valueTags.begin(), valueTags.end(), 2); + std::iota(valueTags.begin(), valueTags.end(), 2); UploadRow(runtime, "/MyRoot/Table", partitionIdx, keyTags, valueTags, keys, values); @@ -1298,7 +1299,7 @@ value { "jsondoc_value", "uuid_value", }; - + auto contentOriginalTable = ReadTable(runtime, TTestTxConfig::FakeHiveTablets, "Table", readKeyDesc, readColumns); NKqp::CompareYson(expectedJson, contentOriginalTable); @@ -2166,7 +2167,7 @@ Y_UNIT_TEST_SUITE(TImportTests) { void Run(TTestBasicRuntime& runtime, TTestEnv& env, THashMap&& data, const TString& request, Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS, - const TString& dbName = "/MyRoot", bool serverless = false, const TString& userSID = "") + const TString& dbName = "/MyRoot", bool serverless = false, const TString& userSID = "", const TString& peerName = "") { ui64 id = 100; @@ -2256,7 +2257,7 @@ Y_UNIT_TEST_SUITE(TImportTests) { break; } - TestImport(runtime, schemeshardId, ++id, dbName, Sprintf(request.data(), port), userSID, initialStatus); + TestImport(runtime, schemeshardId, ++id, dbName, Sprintf(request.data(), port), userSID, peerName, initialStatus); env.TestWaitNotification(runtime, id, schemeshardId); if (initialStatus != Ydb::StatusIds::SUCCESS) { @@ -3090,6 +3091,9 @@ Y_UNIT_TEST_SUITE(TImportTests) { void CancelShouldSucceed(TDelayFunc delayFunc) { TTestBasicRuntime runtime; + std::vector auditLines; + runtime.AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines)); + TTestEnv env(runtime, TTestEnvOptions()); ui64 txId = 100; @@ -3134,12 +3138,45 @@ Y_UNIT_TEST_SUITE(TImportTests) { )", port)); const ui64 importId = txId; + // Check audit record for import start + { + auto line = FindAuditLine(auditLines, "operation=IMPORT START"); + UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard"); + UNIT_ASSERT_STRING_CONTAINS(line, "operation=IMPORT START"); + UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", importId)); + UNIT_ASSERT_STRING_CONTAINS(line, "remote_address={none}"); + UNIT_ASSERT_STRING_CONTAINS(line, "subject={none}"); + UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot"); + UNIT_ASSERT_STRING_CONTAINS(line, "status=SUCCESS"); + UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=SUCCESS"); + UNIT_ASSERT(!line.contains("reason")); + UNIT_ASSERT(!line.contains("start_time")); + UNIT_ASSERT(!line.contains("end_time")); + } + WaitForDelayed(runtime, delayed, prevObserver); TestCancelImport(runtime, ++txId, "/MyRoot", importId); runtime.Send(delayed.Release(), 0, true); env.TestWaitNotification(runtime, importId); + // Check audit record for import end + // + { + auto line = FindAuditLine(auditLines, "operation=IMPORT END"); + UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard"); + UNIT_ASSERT_STRING_CONTAINS(line, "operation=IMPORT END"); + UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", importId)); + UNIT_ASSERT_STRING_CONTAINS(line, "remote_address={none}"); + UNIT_ASSERT_STRING_CONTAINS(line, "subject={none}"); + UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot"); + UNIT_ASSERT_STRING_CONTAINS(line, "status=ERROR"); + UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=CANCELLED"); + UNIT_ASSERT_STRING_CONTAINS(line, "reason=Cancelled"); + UNIT_ASSERT_STRING_CONTAINS(line, "start_time="); + UNIT_ASSERT_STRING_CONTAINS(line, "end_time="); + } + TestGetImport(runtime, importId, "/MyRoot", Ydb::StatusIds::CANCELLED); } @@ -3430,6 +3467,218 @@ Y_UNIT_TEST_SUITE(TImportTests) { UNIT_ASSERT_LT(entry.GetStartTime().seconds(), entry.GetEndTime().seconds()); } + // Based on CompletedImportEndTime + Y_UNIT_TEST(AuditCompletedImport) { + TTestBasicRuntime runtime; + std::vector auditLines; + runtime.AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines)); + + TTestEnv env(runtime); + + runtime.UpdateCurrentTime(TInstant::Now()); + ui64 txId = 100; + + const auto data = GenerateTestData(R"( + columns { + name: "key" + type { optional_type { item { type_id: UTF8 } } } + } + columns { + name: "value" + type { optional_type { item { type_id: UTF8 } } } + } + primary_key: "key" + )", {{"a", 1}}); + + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock(ConvertTestData(data), TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + const auto request = Sprintf(R"( + OperationParams { + labels { + key: "uid" + value: "foo" + } + } + ImportFromS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_prefix: "" + destination_path: "/MyRoot/Table" + } + } + )", port); + TestImport(runtime, ++txId, "/MyRoot", request, /*userSID*/ "user@builtin", /*peerName*/ "127.0.0.1:9876"); + + // Check audit record for import start + { + auto line = FindAuditLine(auditLines, "operation=IMPORT START"); + UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard"); + UNIT_ASSERT_STRING_CONTAINS(line, "operation=IMPORT START"); + UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", txId)); + UNIT_ASSERT_STRING_CONTAINS(line, "uid=foo"); + UNIT_ASSERT_STRING_CONTAINS(line, "remote_address=127.0.0.1"); + UNIT_ASSERT_STRING_CONTAINS(line, "subject=user@builtin"); + UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot"); + UNIT_ASSERT_STRING_CONTAINS(line, "status=SUCCESS"); + UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=SUCCESS"); + UNIT_ASSERT(!line.contains("reason")); + UNIT_ASSERT(!line.contains("start_time")); + UNIT_ASSERT(!line.contains("end_time")); + } + + runtime.AdvanceCurrentTime(TDuration::Seconds(30)); // doing import + + env.TestWaitNotification(runtime, txId); + + // Check audit record for import end + // + { + auto line = FindAuditLine(auditLines, "operation=IMPORT END"); + UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard"); + UNIT_ASSERT_STRING_CONTAINS(line, "operation=IMPORT END"); + UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", txId)); + UNIT_ASSERT_STRING_CONTAINS(line, "uid=foo"); + UNIT_ASSERT_STRING_CONTAINS(line, "remote_address=127.0.0.1"); + UNIT_ASSERT_STRING_CONTAINS(line, "subject=user@builtin"); + UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot"); + UNIT_ASSERT_STRING_CONTAINS(line, "status=SUCCESS"); + UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=SUCCESS"); + UNIT_ASSERT(!line.contains("reason")); + UNIT_ASSERT_STRING_CONTAINS(line, "start_time="); + UNIT_ASSERT_STRING_CONTAINS(line, "end_time="); + } + + const auto desc = TestGetImport(runtime, txId, "/MyRoot"); + const auto& entry = desc.GetResponse().GetEntry(); + UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_DONE); + UNIT_ASSERT(entry.HasStartTime()); + UNIT_ASSERT(entry.HasEndTime()); + UNIT_ASSERT_LT(entry.GetStartTime().seconds(), entry.GetEndTime().seconds()); + } + + // Based on CancelledImportEndTime + Y_UNIT_TEST(AuditCancelledImport) { + TTestBasicRuntime runtime; + std::vector auditLines; + runtime.AuditLogBackends = std::move(CreateTestAuditLogBackends(auditLines)); + + TTestEnv env(runtime); + + runtime.UpdateCurrentTime(TInstant::Now()); + ui64 txId = 100; + + const auto data = GenerateTestData(R"( + columns { + name: "key" + type { optional_type { item { type_id: UTF8 } } } + } + columns { + name: "value" + type { optional_type { item { type_id: UTF8 } } } + } + primary_key: "key" + )", {{"a", 1}}); + + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock(ConvertTestData(data), TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + auto delayFunc = [](TAutoPtr& ev) { + if (ev->GetTypeRewrite() != TEvSchemeShard::EvModifySchemeTransaction) { + return false; + } + + return ev->Get()->Record + .GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpRestore; + }; + + THolder delayed; + auto prevObserver = SetDelayObserver(runtime, delayed, delayFunc); + + const auto request = Sprintf(R"( + OperationParams { + labels { + key: "uid" + value: "foo" + } + } + ImportFromS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_prefix: "" + destination_path: "/MyRoot/Table" + } + } + )", port); + TestImport(runtime, ++txId, "/MyRoot", request, /*userSID*/ "user@builtin", /*peerName*/ "127.0.0.1:9876"); + const ui64 importId = txId; + + // Check audit record for import start + { + auto line = FindAuditLine(auditLines, "operation=IMPORT START"); + UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard"); + UNIT_ASSERT_STRING_CONTAINS(line, "operation=IMPORT START"); + UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", importId)); + UNIT_ASSERT_STRING_CONTAINS(line, "uid=foo"); + UNIT_ASSERT_STRING_CONTAINS(line, "remote_address=127.0.0.1"); + UNIT_ASSERT_STRING_CONTAINS(line, "subject=user@builtin"); + UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot"); + UNIT_ASSERT_STRING_CONTAINS(line, "status=SUCCESS"); + UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=SUCCESS"); + UNIT_ASSERT(!line.contains("reason")); + UNIT_ASSERT(!line.contains("start_time")); + UNIT_ASSERT(!line.contains("end_time")); + } + + runtime.AdvanceCurrentTime(TDuration::Seconds(30)); // doing import + + WaitForDelayed(runtime, delayed, prevObserver); + + TestCancelImport(runtime, ++txId, "/MyRoot", importId); + + auto desc = TestGetImport(runtime, importId, "/MyRoot"); + auto entry = desc.GetResponse().GetEntry(); + UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_CANCELLATION); + UNIT_ASSERT(entry.HasStartTime()); + UNIT_ASSERT(!entry.HasEndTime()); + + runtime.Send(delayed.Release(), 0, true); + env.TestWaitNotification(runtime, importId); + + desc = TestGetImport(runtime, importId, "/MyRoot", Ydb::StatusIds::CANCELLED); + entry = desc.GetResponse().GetEntry(); + UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_CANCELLED); + UNIT_ASSERT(entry.HasStartTime()); + UNIT_ASSERT(entry.HasEndTime()); + UNIT_ASSERT_LT(entry.GetStartTime().seconds(), entry.GetEndTime().seconds()); + + // Check audit record for import end + // + { + auto line = FindAuditLine(auditLines, "operation=IMPORT END"); + UNIT_ASSERT_STRING_CONTAINS(line, "component=schemeshard"); + UNIT_ASSERT_STRING_CONTAINS(line, "operation=IMPORT END"); + UNIT_ASSERT_STRING_CONTAINS(line, Sprintf("id=%lu", importId)); + UNIT_ASSERT_STRING_CONTAINS(line, "uid=foo"); + UNIT_ASSERT_STRING_CONTAINS(line, "remote_address=127.0.0.1"); + UNIT_ASSERT_STRING_CONTAINS(line, "subject=user@builtin"); + UNIT_ASSERT_STRING_CONTAINS(line, "database=/MyRoot"); + UNIT_ASSERT_STRING_CONTAINS(line, "status=ERROR"); + UNIT_ASSERT_STRING_CONTAINS(line, "detailed_status=CANCELLED"); + UNIT_ASSERT_STRING_CONTAINS(line, "reason=Cancelled"); + UNIT_ASSERT_STRING_CONTAINS(line, "start_time="); + UNIT_ASSERT_STRING_CONTAINS(line, "end_time="); + } + } + Y_UNIT_TEST(UserSID) { TTestBasicRuntime runtime; TTestEnv env(runtime); diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 975514b27a12..dad02ce621bc 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -207,6 +207,7 @@ SRCS( schemeshard_utils.cpp schemeshard_utils.h schemeshard_bg_tasks__list.cpp + schemeshard_xxport__helpers.cpp schemeshard_export__cancel.cpp schemeshard_export__create.cpp schemeshard_export__forget.cpp diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 1a4f2de0d060..804ca76f1716 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -2952,6 +2952,11 @@ "ColumnId": 15, "ColumnName": "EndTime", "ColumnType": "Uint64" + }, + { + "ColumnId": 16, + "ColumnName": "PeerName", + "ColumnType": "Utf8" } ], "ColumnsDropped": [], @@ -2972,7 +2977,8 @@ 12, 13, 14, - 15 + 15, + 16 ], "RoomID": 0, "Codec": 0, @@ -5717,6 +5723,11 @@ "ColumnId": 12, "ColumnName": "EndTime", "ColumnType": "Uint64" + }, + { + "ColumnId": 13, + "ColumnName": "PeerName", + "ColumnType": "Utf8" } ], "ColumnsDropped": [], @@ -5734,7 +5745,8 @@ 9, 10, 11, - 12 + 12, + 13 ], "RoomID": 0, "Codec": 0,