diff --git a/ydb/core/backup/common/checksum.cpp b/ydb/core/backup/common/checksum.cpp index 006ad48cd8b7..57b2c6fbf382 100644 --- a/ydb/core/backup/common/checksum.cpp +++ b/ydb/core/backup/common/checksum.cpp @@ -4,6 +4,19 @@ #include +namespace { + +template +void FillArrayFromProto(T (&array)[N], const NProtoBuf::RepeatedField& proto) { + for (int i = 0; i < proto.size(); ++i) { + if (static_cast(i) < std::size(array)) { + array[i] = proto.Get(i); + } + } +} + +} // anonymous + namespace NKikimr::NBackup { class TSHA256 : public IChecksum { @@ -16,12 +29,41 @@ class TSHA256 : public IChecksum { SHA256_Update(&Context, data.data(), data.size()); } - TString Serialize() override { + TString Finalize() override { unsigned char hash[SHA256_DIGEST_LENGTH]; SHA256_Final(hash, &Context); return to_lower(HexEncode(hash, SHA256_DIGEST_LENGTH)); } + TChecksumState GetState() const override { + TChecksumState state; + auto& sha256State = *state.MutableSha256State(); + + for (ui32 h : Context.h) { + sha256State.AddH(h); + } + sha256State.SetNh(Context.Nh); + sha256State.SetNl(Context.Nl); + for (ui32 data : Context.data) { + sha256State.AddData(data); + } + sha256State.SetNum(Context.num); + sha256State.SetMdLen(Context.md_len); + + return state; + } + + void Continue(const TChecksumState& state) override { + const auto& sha256State = state.GetSha256State(); + SHA256_Init(&Context); + FillArrayFromProto(Context.h, sha256State.GetH()); + Context.Nh = sha256State.GetNh(); + Context.Nl = sha256State.GetNl(); + FillArrayFromProto(Context.data, sha256State.GetData()); + Context.num = sha256State.GetNum(); + Context.md_len = sha256State.GetMdLen(); + } + private: SHA256_CTX Context; }; @@ -29,7 +71,7 @@ class TSHA256 : public IChecksum { TString ComputeChecksum(TStringBuf data) { IChecksum::TPtr checksum(CreateChecksum()); checksum->AddData(data); - return checksum->Serialize(); + return checksum->Finalize(); } IChecksum* CreateChecksum() { @@ -40,4 +82,4 @@ TString ChecksumKey(const TString& objKey) { return objKey + ".sha256"; } -} // NKikimr::NDataShard +} // NKikimr::NBackup diff --git a/ydb/core/backup/common/checksum.h b/ydb/core/backup/common/checksum.h index 4da5dbbe968c..a647324ff41b 100644 --- a/ydb/core/backup/common/checksum.h +++ b/ydb/core/backup/common/checksum.h @@ -1,9 +1,13 @@ #pragma once +#include + #include namespace NKikimr::NBackup { +using NKikimrBackup::TChecksumState; + class IChecksum { public: using TPtr = std::unique_ptr; @@ -11,7 +15,10 @@ class IChecksum { virtual ~IChecksum() = default; virtual void AddData(TStringBuf data) = 0; - virtual TString Serialize() = 0; + virtual TString Finalize() = 0; + + virtual TChecksumState GetState() const = 0; + virtual void Continue(const TChecksumState& state) = 0; }; IChecksum* CreateChecksum(); diff --git a/ydb/core/backup/common/metadata.cpp b/ydb/core/backup/common/metadata.cpp index 3b14d50e4303..fd70983acaa3 100644 --- a/ydb/core/backup/common/metadata.cpp +++ b/ydb/core/backup/common/metadata.cpp @@ -23,7 +23,9 @@ ui64 TMetadata::GetVersion() const { TString TMetadata::Serialize() const { NJson::TJsonMap m; - m["version"] = *Version; + if (Version.Defined()) { + m["version"] = *Version; + } NJson::TJsonArray fullBackups; for (auto &[tp, b] : FullBackups) { diff --git a/ydb/core/protos/checksum.proto b/ydb/core/protos/checksum.proto new file mode 100644 index 000000000000..d6f681c6bc61 --- /dev/null +++ b/ydb/core/protos/checksum.proto @@ -0,0 +1,19 @@ +package NKikimrBackup; +option java_package = "ru.yandex.kikimr.proto"; + +// Corresponds to the OpenSSL SHA256_CTX structure. +message TSha256State { + repeated uint32 H = 1; + optional uint32 Nh = 2; + optional uint32 Nl = 3; + repeated uint32 Data = 4; + optional uint32 Num = 5; + optional uint32 MdLen = 6; +} + +// Used to serialize the intermediate state of a checksum. +message TChecksumState { + oneof state { + TSha256State Sha256State = 1; + } +} diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index 83a180afb198..c4d6eb718800 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -184,7 +184,7 @@ message TFeatureFlags { optional bool EnableDataShardInMemoryStateMigration = 159 [default = true]; optional bool EnableDataShardInMemoryStateMigrationAcrossGenerations = 160 [default = false]; optional bool DisableLocalDBEraseCache = 161 [default = false]; - optional bool EnableExportChecksums = 162 [default = false]; + optional bool EnableChecksumsExport = 162 [default = false]; optional bool EnableTopicTransfer = 163 [default = false]; optional bool EnableViewExport = 164 [default = false]; optional bool EnableColumnStore = 165 [default = false]; diff --git a/ydb/core/protos/ya.make b/ydb/core/protos/ya.make index d2f4bec42af2..89b69a9541a1 100644 --- a/ydb/core/protos/ya.make +++ b/ydb/core/protos/ya.make @@ -31,6 +31,7 @@ SRCS( bootstrapper.proto change_exchange.proto channel_purpose.proto + checksum.proto cms.proto compaction.proto compile_service_config.proto diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index d919a4d05c25..034d6ff1d6a3 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -71,7 +71,7 @@ class TTestFeatureFlagsHolder { FEATURE_FLAG_SETTER(EnableParameterizedDecimal) FEATURE_FLAG_SETTER(EnableTopicAutopartitioningForCDC) FEATURE_FLAG_SETTER(EnableFollowerStats) - FEATURE_FLAG_SETTER(EnableExportChecksums) + FEATURE_FLAG_SETTER(EnableChecksumsExport) FEATURE_FLAG_SETTER(EnableTopicTransfer) FEATURE_FLAG_SETTER(EnableStrictUserManagement) FEATURE_FLAG_SETTER(EnableDatabaseAdmin) diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 3bb70f6291a0..c36c6331bb1f 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -52,6 +52,7 @@ #include #include #include +#include #include #include @@ -789,9 +790,18 @@ class TDataShard struct ProcessedBytes : Column<4, NScheme::NTypeIds::Uint64> {}; struct WrittenBytes : Column<5, NScheme::NTypeIds::Uint64> {}; struct WrittenRows : Column<6, NScheme::NTypeIds::Uint64> {}; + struct ChecksumState : Column<7, NScheme::NTypeIds::String> { using Type = NKikimrBackup::TChecksumState; }; using TKey = TableKey; - using TColumns = TableColumns; + using TColumns = TableColumns< + TxId, + SchemeETag, + DataETag, + ProcessedBytes, + WrittenBytes, + WrittenRows, + ChecksumState + >; }; struct ChangeRecords : Table<17> { diff --git a/ydb/core/tx/datashard/datashard_s3_download.h b/ydb/core/tx/datashard/datashard_s3_download.h index eeb9fb213c0f..d75e41cce1b8 100644 --- a/ydb/core/tx/datashard/datashard_s3_download.h +++ b/ydb/core/tx/datashard/datashard_s3_download.h @@ -1,5 +1,7 @@ #pragma once +#include + #include namespace NKikimr { @@ -10,6 +12,7 @@ struct TS3Download { ui64 ProcessedBytes = 0; ui64 WrittenBytes = 0; ui64 WrittenRows = 0; + NKikimrBackup::TChecksumState ChecksumState; void Out(IOutputStream& out) const { out << "{" @@ -17,6 +20,7 @@ struct TS3Download { << " ProcessedBytes: " << ProcessedBytes << " WrittenBytes: " << WrittenBytes << " WrittenRows: " << WrittenRows + << " ChecksumState: " << ChecksumState.ShortDebugString() << " }"; } }; diff --git a/ydb/core/tx/datashard/datashard_s3_downloads.cpp b/ydb/core/tx/datashard/datashard_s3_downloads.cpp index ed5f5b024388..49963761b95f 100644 --- a/ydb/core/tx/datashard/datashard_s3_downloads.cpp +++ b/ydb/core/tx/datashard/datashard_s3_downloads.cpp @@ -24,6 +24,10 @@ bool TS3DownloadsManager::Load(NIceDb::TNiceDb& db) { info.WrittenBytes = rowset.GetValueOrDefault(0); info.WrittenRows = rowset.GetValueOrDefault(0); + if (rowset.HaveValue()) { + info.ChecksumState = rowset.GetValue(); + } + if (!rowset.Next()) { ready = false; break; @@ -56,7 +60,9 @@ const TS3Download& TS3DownloadsManager::Store(NIceDb::TNiceDb& db, ui64 txId, co NIceDb::TUpdate(*newInfo.DataETag), NIceDb::TUpdate(newInfo.ProcessedBytes), NIceDb::TUpdate(newInfo.WrittenBytes), - NIceDb::TUpdate(newInfo.WrittenRows)); + NIceDb::TUpdate(newInfo.WrittenRows), + NIceDb::TUpdate(newInfo.ChecksumState) + ); return info; } diff --git a/ydb/core/tx/datashard/export_s3_buffer.cpp b/ydb/core/tx/datashard/export_s3_buffer.cpp index 6a97fa4bcd90..c44f60b7546d 100644 --- a/ydb/core/tx/datashard/export_s3_buffer.cpp +++ b/ydb/core/tx/datashard/export_s3_buffer.cpp @@ -297,7 +297,7 @@ IEventBase* TS3Buffer::PrepareEvent(bool last, NExportScan::IBuffer::TStats& sta stats.BytesSent = buffer->Size(); if (Checksum && last) { - return new TEvExportScan::TEvBuffer(std::move(*buffer), last, Checksum->Serialize()); + return new TEvExportScan::TEvBuffer(std::move(*buffer), last, Checksum->Finalize()); } else { return new TEvExportScan::TEvBuffer(std::move(*buffer), last); } diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp index 9897ebd8bf9d..9cc842303e00 100644 --- a/ydb/core/tx/datashard/import_s3.cpp +++ b/ydb/core/tx/datashard/import_s3.cpp @@ -421,13 +421,21 @@ class TS3Downloader: public TActorBootstrapped { } } + TChecksumState GetChecksumState() const { + TChecksumState checksumState; + if (Checksum) { + checksumState = Checksum->GetState(); + } + return checksumState; + } + void Handle(TEvDataShard::TEvS3DownloadInfo::TPtr& ev) { IMPORT_LOG_D("Handle " << ev->Get()->ToString()); const auto& info = ev->Get()->Info; if (!info.DataETag) { Send(DataShard, new TEvDataShard::TEvStoreS3DownloadInfo(TxId, { - ETag, ProcessedBytes, WrittenBytes, WrittenRows + ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetChecksumState() })); return; } @@ -447,11 +455,15 @@ class TS3Downloader: public TActorBootstrapped { ProcessedBytes = info.ProcessedBytes; WrittenBytes = info.WrittenBytes; WrittenRows = info.WrittenRows; + if (Checksum) { + Checksum->Continue(info.ChecksumState); + } if (!ContentLength || ProcessedBytes >= ContentLength) { - if (CheckChecksum()) { - return Finish(); + if (!CheckChecksum()) { + return; } + return Finish(); } Process(); @@ -491,7 +503,7 @@ class TS3Downloader: public TActorBootstrapped { } const auto contentLength = result.GetResult().GetContentLength(); - const auto checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, CompressionCodec)); + const auto checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, ECompressionCodec::None)); GetObject(checksumKey, std::make_pair(0, contentLength - 1)); } @@ -603,7 +615,7 @@ class TS3Downloader: public TActorBootstrapped { << ", size# " << record->ByteSizeLong()); Send(DataShard, new TEvDataShard::TEvS3UploadRowsRequest(TxId, record, { - ETag, ProcessedBytes, WrittenBytes, WrittenRows + ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetChecksumState() })); } @@ -703,13 +715,14 @@ class TS3Downloader: public TActorBootstrapped { return true; } - TString gotChecksum = Checksum->Serialize(); + TString gotChecksum = Checksum->Finalize(); if (gotChecksum == ExpectedChecksum) { return true; } - const TString error = TStringBuilder() << "Checksum mismatch:" - << ": expected# " << ExpectedChecksum + const TString error = TStringBuilder() << "Checksum mismatch for " + << Settings.GetDataKey(DataFormat, ECompressionCodec::None) + << " expected# " << ExpectedChecksum << ", got# " << gotChecksum; IMPORT_LOG_E(error); diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index 729e5b00d038..7fe783bb8682 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -129,7 +129,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { } exportInfo = new TExportInfo(id, uid, TExportInfo::EKind::S3, settings, domainPath.Base()->PathId, request.GetPeerName()); - exportInfo->EnableChecksums = AppData()->FeatureFlags.GetEnableExportChecksums(); + exportInfo->EnableChecksums = AppData()->FeatureFlags.GetEnableChecksumsExport(); exportInfo->EnablePermissions = AppData()->FeatureFlags.GetEnablePermissionsExport(); TString explain; if (!FillItems(exportInfo, settings, explain)) { diff --git a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp index 30b63428d47f..f8cbdc55361a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp @@ -170,8 +170,8 @@ THolder RestorePropose( restoreSettings.SetRegion(region); } - if (item.Metadata.HasVersion()) { - task.SetValidateChecksums(item.Metadata.GetVersion() > 0 && !importInfo->Settings.skip_checksum_validation()); + if (!item.Metadata.HasVersion() || item.Metadata.GetVersion() > 0) { + task.SetValidateChecksums(!importInfo->Settings.skip_checksum_validation()); } } break; diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp index 1a311fb5262c..c5479ea762c8 100644 --- a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp @@ -133,7 +133,7 @@ class TSchemeGetter: public TActorBootstrapped { } const auto contentLength = result.GetResult().GetContentLength(); - GetObject(ChecksumKey, std::make_pair(0, contentLength - 1)); + GetObject(NBackup::ChecksumKey(CurrentObjectKey), std::make_pair(0, contentLength - 1)); } void HandleChangefeed(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) { @@ -197,12 +197,10 @@ class TSchemeGetter: public TActorBootstrapped { item.Metadata = NBackup::TMetadata::Deserialize(msg.Body); - if (!item.Metadata.HasVersion()) { - return Reply(false, "Metadata is corrupted: no version"); + if (item.Metadata.HasVersion() && item.Metadata.GetVersion() == 0) { + NeedValidateChecksums = false; } - NeedValidateChecksums = item.Metadata.GetVersion() > 0 && !SkipChecksumValidation; - auto nextStep = [this]() { StartDownloadingScheme(); }; @@ -305,10 +303,10 @@ class TSchemeGetter: public TActorBootstrapped { } TString expectedChecksum = msg.Body.substr(0, msg.Body.find(' ')); - if (expectedChecksum != Checksum) { - return Reply(false, TStringBuilder() << "Checksum mismatch for " << ChecksumKey + if (expectedChecksum != CurrentObjectChecksum) { + return Reply(false, TStringBuilder() << "Checksum mismatch for " << CurrentObjectKey << " expected# " << expectedChecksum - << ", got# " << Checksum); + << ", got# " << CurrentObjectChecksum); } ChecksumValidatedCallback(); @@ -506,7 +504,7 @@ class TSchemeGetter: public TActorBootstrapped { } void DownloadChecksum() { - Download(ChecksumKey); + Download(NBackup::ChecksumKey(CurrentObjectKey)); } void DownloadChangefeeds() { @@ -536,8 +534,8 @@ class TSchemeGetter: public TActorBootstrapped { } void StartValidatingChecksum(const TString& key, const TString& object, std::function checksumValidatedCallback) { - ChecksumKey = NBackup::ChecksumKey(key); - Checksum = NBackup::ComputeChecksum(object); + CurrentObjectKey = key; + CurrentObjectChecksum = NBackup::ComputeChecksum(object); ChecksumValidatedCallback = checksumValidatedCallback; ResetRetries(); @@ -556,7 +554,7 @@ class TSchemeGetter: public TActorBootstrapped { , PermissionsKey(PermissionsKeyFromSettings(importInfo->Settings, itemIdx)) , Retries(importInfo->Settings.number_of_retries()) , NeedDownloadPermissions(!importInfo->Settings.no_acl()) - , SkipChecksumValidation(importInfo->Settings.skip_checksum_validation()) + , NeedValidateChecksums(!importInfo->Settings.skip_checksum_validation()) { } @@ -648,11 +646,10 @@ class TSchemeGetter: public TActorBootstrapped { TActorId Client; - const bool SkipChecksumValidation = false; bool NeedValidateChecksums = true; - TString Checksum; - TString ChecksumKey; + TString CurrentObjectChecksum; + TString CurrentObjectKey; std::function ChecksumValidatedCallback; }; // TSchemeGetter diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp index 763ea5ea509e..ccdf04e11dab 100644 --- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp @@ -2354,7 +2354,7 @@ partitioning_settings { Y_UNIT_TEST(Checksums) { TTestBasicRuntime runtime; - TTestEnv env(runtime, TTestEnvOptions().EnablePermissionsExport(true)); + TTestEnv env(runtime, TTestEnvOptions().EnablePermissionsExport(true).EnableChecksumsExport(true)); ui64 txId = 100; TestCreateTable(runtime, ++txId, "/MyRoot", R"( @@ -2405,7 +2405,7 @@ partitioning_settings { Y_UNIT_TEST(EnableChecksumsPersistance) { TTestBasicRuntime runtime; - TTestEnv env(runtime); + TTestEnv env(runtime, TTestEnvOptions().EnableChecksumsExport(true)); ui64 txId = 100; // Create test table @@ -2474,7 +2474,7 @@ partitioning_settings { Y_UNIT_TEST(ChecksumsWithCompression) { TTestBasicRuntime runtime; - TTestEnv env(runtime); + TTestEnv env(runtime, TTestEnvOptions().EnableChecksumsExport(true)); ui64 txId = 100; TestCreateTable(runtime, ++txId, "/MyRoot", R"( @@ -2634,7 +2634,7 @@ attributes { } )", port); - TTestEnv env(runtime); + TTestEnv env(runtime, TTestEnvOptions().EnableChecksumsExport(true)); Run(runtime, env, TVector{ R"( Name: "Table" diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp index ebe225e7e6a4..62f67bf21940 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp @@ -606,7 +606,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe app.SetEnableParameterizedDecimal(opts.EnableParameterizedDecimal_); app.SetEnableTopicAutopartitioningForCDC(opts.EnableTopicAutopartitioningForCDC_); app.SetEnableBackupService(opts.EnableBackupService_); - app.SetEnableExportChecksums(true); + app.SetEnableChecksumsExport(opts.EnableChecksumsExport_); app.SetEnableTopicTransfer(opts.EnableTopicTransfer_); app.SetEnablePermissionsExport(opts.EnablePermissionsExport_); diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.h b/ydb/core/tx/schemeshard/ut_helpers/test_env.h index 4265a19f288e..3e2f44673fc0 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.h +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.h @@ -76,6 +76,7 @@ namespace NSchemeShardUT_Private { OPTION(std::optional, EnableStrictUserManagement, std::nullopt); OPTION(std::optional, EnableDatabaseAdmin, std::nullopt); OPTION(std::optional, EnablePermissionsExport, std::nullopt); + OPTION(std::optional, EnableChecksumsExport, std::nullopt); #undef OPTION }; diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp index ec6cb468cb45..1035a65e7893 100644 --- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp +++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp @@ -1,29 +1,28 @@ #include "ut_helpers/ut_backup_restore_common.h" -#include - +#include #include #include +#include #include #include #include -#include -#include -#include -#include #include +#include +#include +#include +#include #include -#include #include #include #include #include - #include #include +#include #include #include #include @@ -35,6 +34,8 @@ #include #include +#include + using namespace NKikimr::NSchemeShard; using namespace NKikimr::NWrappers::NTestHelpers; using namespace NKikimr; @@ -99,14 +100,46 @@ namespace { return schemeStr; } - struct TTestData { + struct TDataWithChecksum { TString Data; + TString Checksum; + + TDataWithChecksum() = default; + + TDataWithChecksum(TString&& data) + : Data(std::move(data)) + , Checksum(NBackup::ComputeChecksum(Data)) + {} + + TDataWithChecksum(const char* data) + : TDataWithChecksum(TString(data)) + {} + + TDataWithChecksum& operator=(const TString& data) { + Data = data; + Checksum = NBackup::ComputeChecksum(Data); + return *this; + } + + operator TString() const { + return Data; + } + + operator bool() const { + return !Data.empty(); + } + }; + + struct TTestData { + TDataWithChecksum RawData; + TString Data; // RawData after compression/encryption TString YsonStr; EDataFormat DataFormat = EDataFormat::Csv; ECompressionCodec CompressionCodec; - TTestData(TString data, TString ysonStr, ECompressionCodec codec = ECompressionCodec::None) - : Data(std::move(data)) + TTestData(TString csvData, TString ysonStr, ECompressionCodec codec = ECompressionCodec::None) + : RawData(std::move(csvData)) + , Data(RawData) , YsonStr(std::move(ysonStr)) , CompressionCodec(codec) { @@ -140,16 +173,16 @@ namespace { }; struct TImportChangefeed { - TString Changefeed; - TString Topic; + TDataWithChecksum Changefeed; + TDataWithChecksum Topic; }; struct TTestDataWithScheme { - TString Metadata; + TDataWithChecksum Metadata = R"({"version": 0})"; EPathType Type = EPathTypeTable; - TString Scheme; - TString CreationQuery; - TString Permissions; + TDataWithChecksum Scheme; + TDataWithChecksum CreationQuery; + TDataWithChecksum Permissions; TImportChangefeed Changefeed; TVector Data; @@ -257,7 +290,7 @@ namespace { const TTypedScheme& typedScheme, const TVector>& shardsConfig = {{"a", 1}}, const TString& permissions = "", - const TString& metadata = "" + const TString& metadata = R"({"version": 0})" ) { TTestDataWithScheme result; result.Type = typedScheme.Type; @@ -275,7 +308,8 @@ namespace { result.CreationQuery = typedScheme.Scheme; break; case EPathTypeCdcStream: - result.Changefeed = {typedScheme.Scheme, typedScheme.Attributes.GetTopicDescription()}; + result.Changefeed.Changefeed = typedScheme.Scheme; + result.Changefeed.Topic = typedScheme.Attributes.GetTopicDescription(); break; default: UNIT_FAIL("cannot create sample test data for the scheme object type: " << typedScheme.Type); @@ -289,35 +323,62 @@ namespace { THashMap result; for (const auto& [prefix, item] : data) { + bool withChecksum = item.Metadata.Data != R"({"version": 0})"; + + auto metadataKey = prefix + "/metadata.json"; + result.emplace(metadataKey, item.Metadata); + if (withChecksum) { + result.emplace(NBackup::ChecksumKey(metadataKey), item.Metadata.Checksum); + } + switch (item.Type) { - case EPathTypeTable: - result.emplace(prefix + "/scheme.pb", item.Scheme); + case EPathTypeTable: { + auto schemeKey = prefix + "/scheme.pb"; + result.emplace(schemeKey, item.Scheme); + if (withChecksum) { + result.emplace(NBackup::ChecksumKey(schemeKey), item.Scheme.Checksum); + } break; - case EPathTypeView: - result.emplace(prefix + "/create_view.sql", item.CreationQuery); + } + case EPathTypeView: { + auto viewKey = prefix + "/create_view.sql"; + result.emplace(viewKey, item.CreationQuery); + if (withChecksum) { + result.emplace(NBackup::ChecksumKey(viewKey), item.CreationQuery.Checksum); + } break; - case EPathTypeCdcStream: - result.emplace(prefix + "/changefeed_description.pb", item.Changefeed.Changefeed); - result.emplace(prefix + "/topic_description.pb", item.Changefeed.Topic); + } + case EPathTypeCdcStream: { + auto changefeedKey = prefix + "/changefeed_description.pb"; + auto topicKey = prefix + "/topic_description.pb"; + result.emplace(changefeedKey, item.Changefeed.Changefeed); + result.emplace(topicKey, item.Changefeed.Topic); + if (withChecksum) { + result.emplace(NBackup::ChecksumKey(changefeedKey), item.Changefeed.Changefeed.Checksum); + result.emplace(NBackup::ChecksumKey(topicKey), item.Changefeed.Topic.Checksum); + } break; + } default: UNIT_FAIL("cannot determine key for the scheme object type: " << item.Type); return {}; } - if (item.Metadata) { - result.emplace(prefix + "/metadata.json", item.Metadata); - } else { - result.emplace(prefix + "/metadata.json", R"({"version": 0})"); // without checksums - } - if (item.Permissions) { - result.emplace(prefix + "/permissions.pb", item.Permissions); + auto permissionsKey = prefix + "/permissions.pb"; + result.emplace(permissionsKey, item.Permissions); + if (withChecksum) { + result.emplace(NBackup::ChecksumKey(permissionsKey), item.Permissions.Checksum); + } } for (ui32 i = 0; i < item.Data.size(); ++i) { const auto& data = item.Data.at(i); result.emplace(Sprintf("%s/data_%02d%s", prefix.data(), i, data.Ext().c_str()), data.Data); + if (withChecksum) { + auto rawDataKey = Sprintf("%s/data_%02d.csv", prefix.data(), i); + result.emplace(NBackup::ChecksumKey(rawDataKey), data.RawData.Checksum); + } } } @@ -1857,6 +1918,248 @@ value { TestGetImport(runtime, txId, "/MyRoot"); } + Y_UNIT_TEST_WITH_COMPRESSION(ExportImportWithChecksums) { + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableChecksumsExport(true)); + + ui64 txId = 100; + + runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE); + + // Create table + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Original" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + // Upload data + UpdateRow(runtime, "Original", 1, "valueA", TTestTxConfig::FakeHiveTablets); + + // Export table + const char* compression = Codec == ECompressionCodec::Zstd ? "zstd" : ""; + TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Original" + destination_prefix: "" + } + compression: "%s" + } + )", port, compression)); + const ui64 exportId = txId; + env.TestWaitNotification(runtime, exportId); + + // Check export + TestGetExport(runtime, exportId, "/MyRoot"); + + UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 6); + + // Restore table + TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"( + ImportFromS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_prefix: "" + destination_path: "/MyRoot/Restored" + } + } + )", port)); + const ui64 importId = txId; + env.TestWaitNotification(runtime, importId); + + // Check import + TestGetImport(runtime, importId, "/MyRoot"); + + // Check data in restored table + { + auto expectedJson = TStringBuilder() << "[[[[" + << "[" + << R"(["1"];)" // key + << R"(["valueA"])" // value + << "];" + << "];\%false]]]"; + auto content = ReadTable(runtime, TTestTxConfig::FakeHiveTablets + 2, "Restored", {"key", "Uint32", "0"}); + NKqp::CompareYson(expectedJson, content); + } + } + + template + void ExportImportWithCorruption(T corruption) { + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableChecksumsExport(true).EnablePermissionsExport(true)); + + ui64 txId = 100; + + runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE); + + // Create table + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Original" + Columns { Name: "key" Type: "Uint32" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + // Upload data + UpdateRow(runtime, "Original", 1, "valueA", TTestTxConfig::FakeHiveTablets); + + // Export table + const char* compression = Codec == ECompressionCodec::Zstd ? "zstd" : ""; + TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Original" + destination_prefix: "" + } + compression: "%s" + } + )", port, compression)); + const ui64 exportId = txId; + env.TestWaitNotification(runtime, exportId); + + // Check export + TestGetExport(runtime, exportId, "/MyRoot"); + + UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 8); + + // Make corruption + corruption(s3Mock.GetData()); + + // Restore corrupted table + TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"( + ImportFromS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_prefix: "" + destination_path: "/MyRoot/Restored1" + } + } + )", port)); + ui64 importId = txId; + env.TestWaitNotification(runtime, importId); + + // Check corrupted import + TestGetImport(runtime, importId, "/MyRoot", Ydb::StatusIds::CANCELLED); + + // Restore corrupted table with skip checksum validation + TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"( + ImportFromS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_prefix: "" + destination_path: "/MyRoot/Restored2" + } + skip_checksum_validation: true + } + )", port)); + importId = txId; + env.TestWaitNotification(runtime, importId); + + // Check corrupted import with skip checksum validation + TestGetImport(runtime, importId, "/MyRoot"); + } + + Y_UNIT_TEST(ExportImportWithMetadataCorruption) { + ExportImportWithCorruption([](auto& s3){ + s3["/metadata.json"] = "corrupted"; + }); + } + + Y_UNIT_TEST(ExportImportWithSchemeCorruption) { + ExportImportWithCorruption([](auto& s3){ + s3["/scheme.pb"] = std::regex_replace(std::string(s3["/scheme.pb"]), std::regex("value"), "val"); + }); + } + + Y_UNIT_TEST(ExportImportWithPermissionsCorruption) { + ExportImportWithCorruption([](auto& s3){ + s3["/permissions.pb"] = std::regex_replace(std::string(s3["/permissions.pb"]), std::regex("root"), "alice"); + }); + } + + Y_UNIT_TEST_WITH_COMPRESSION(ExportImportWithDataCorruption) { + ExportImportWithCorruption([](auto& s3){ + s3["/data_00.csv"] = std::regex_replace(std::string(s3["/data_00.csv"]), std::regex("valueA"), "valueB"); + }); + } + + Y_UNIT_TEST(ExportImportWithMetadataChecksumCorruption) { + ExportImportWithCorruption([](auto& s3){ + s3["/metadata.json.sha256"] = "corrupted"; + }); + } + + Y_UNIT_TEST(ExportImportWithSchemeChecksumCorruption) { + ExportImportWithCorruption([](auto& s3){ + s3["/scheme.pb.sha256"] = "corrupted"; + }); + } + + Y_UNIT_TEST(ExportImportWithPermissionsChecksumCorruption) { + ExportImportWithCorruption([](auto& s3){ + s3["/permissions.pb.sha256"] = "corrupted"; + }); + } + + Y_UNIT_TEST_WITH_COMPRESSION(ExportImportWithDataChecksumCorruption) { + ExportImportWithCorruption([](auto& s3){ + s3["/data_00.csv.sha256"] = "corrupted"; + }); + } + + Y_UNIT_TEST(ExportImportWithMetadataChecksumAbsence) { + ExportImportWithCorruption([](auto& s3){ + s3.erase("/metadata.json.sha256"); + }); + } + + Y_UNIT_TEST(ExportImportWithSchemeChecksumAbsence) { + ExportImportWithCorruption([](auto& s3){ + s3.erase("/scheme.pb.sha256"); + }); + } + + Y_UNIT_TEST(ExportImportWithPermissionsChecksumAbsence) { + ExportImportWithCorruption([](auto& s3){ + s3.erase("/permissions.pb.sha256"); + }); + } + + Y_UNIT_TEST_WITH_COMPRESSION(ExportImportWithDataChecksumAbsence) { + ExportImportWithCorruption([](auto& s3){ + s3.erase("/data_00.csv.sha256"); + }); + } + Y_UNIT_TEST_WITH_COMPRESSION(ShouldCountWrittenBytesAndRows) { TTestBasicRuntime runtime; TTestEnv env(runtime); @@ -4410,139 +4713,6 @@ Y_UNIT_TEST_SUITE(TImportTests) { }); } - Y_UNIT_TEST(CorruptedMetadata) { - TTestBasicRuntime runtime; - TTestEnv env(runtime); - ui64 txId = 100; - - const auto metadata = R"( - corrupted - )"; - - const auto data = GenerateTestData(R"( - columns { - name: "key" - type { optional_type { item { type_id: UTF8 } } } - } - columns { - name: "value" - type { optional_type { item { type_id: UTF8 } } } - } - primary_key: "key" - )", {{"a", 1}}, "", metadata); - - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock(ConvertTestData(data), TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"( - ImportFromS3Settings { - endpoint: "localhost:%d" - scheme: HTTP - items { - source_prefix: "" - destination_path: "/MyRoot/Table" - } - } - )", port)); - env.TestWaitNotification(runtime, txId); - - auto desc = TestGetImport(runtime, txId, "/MyRoot", Ydb::StatusIds::CANCELLED); - auto entry = desc.GetResponse().GetEntry(); - UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_CANCELLED); - } - - Y_UNIT_TEST(NoDataChecksums) { - TTestBasicRuntime runtime; - TTestEnv env(runtime); - ui64 txId = 100; - - const auto metadata = R"({ - "version": 1 - })"; - - const auto data = GenerateTestData(R"( - columns { - name: "key" - type { optional_type { item { type_id: UTF8 } } } - } - columns { - name: "value" - type { optional_type { item { type_id: UTF8 } } } - } - primary_key: "key" - )", {{"a", 1}}, "", metadata); - - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock(ConvertTestData(data), TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"( - ImportFromS3Settings { - endpoint: "localhost:%d" - scheme: HTTP - items { - source_prefix: "" - destination_path: "/MyRoot/Table" - } - } - )", port)); - env.TestWaitNotification(runtime, txId); - - auto desc = TestGetImport(runtime, txId, "/MyRoot", Ydb::StatusIds::CANCELLED); - auto entry = desc.GetResponse().GetEntry(); - UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_CANCELLED); - } - - Y_UNIT_TEST(SkipChecksumValidation) { - TTestBasicRuntime runtime; - TTestEnv env(runtime); - ui64 txId = 100; - - const auto metadata = R"( - corrupted - )"; - - const auto data = GenerateTestData(R"( - columns { - name: "key" - type { optional_type { item { type_id: UTF8 } } } - } - columns { - name: "value" - type { optional_type { item { type_id: UTF8 } } } - } - primary_key: "key" - )", {{"a", 1}}, "", metadata); - - TPortManager portManager; - const ui16 port = portManager.GetPort(); - - TS3Mock s3Mock(ConvertTestData(data), TS3Mock::TSettings(port)); - UNIT_ASSERT(s3Mock.Start()); - - TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"( - ImportFromS3Settings { - endpoint: "localhost:%d" - scheme: HTTP - items { - source_prefix: "" - destination_path: "/MyRoot/Table" - } - skip_checksum_validation: true - } - )", port)); - env.TestWaitNotification(runtime, txId); - - auto desc = TestGetImport(runtime, txId, "/MyRoot", Ydb::StatusIds::CANCELLED); - auto entry = desc.GetResponse().GetEntry(); - UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_CANCELLED); - } - Y_UNIT_TEST(ShouldBlockMerge) { TTestBasicRuntime runtime; TTestEnv env(runtime); @@ -5109,6 +5279,48 @@ Y_UNIT_TEST_SUITE(TImportWithRebootsTests) { )"); } + Y_UNIT_TEST(ShouldSucceedOnTableWithChecksum) { + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + auto data = GenerateTestData(R"( + columns { + name: "key" + type { optional_type { item { type_id: UTF8 } } } + } + columns { + name: "value" + type { optional_type { item { type_id: UTF8 } } } + } + primary_key: "key" + )" , {{"a", 1}}, "", R"({"version": 1})"); + + TS3Mock s3Mock(ConvertTestData(data), TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + + runtime.SetLogPriority(NKikimrServices::DATASHARD_RESTORE, NActors::NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::IMPORT, NActors::NLog::PRI_TRACE); + } + + const ui64 importId = ++t.TxId; + AsyncImport(runtime, importId, "/MyRoot", Sprintf(DefaultImportSettings.data(), port)); + t.TestEnv->TestWaitNotification(runtime, importId); + + { + TInactiveZone inactive(activeZone); + TestGetImport(runtime, importId, "/MyRoot", { + Ydb::StatusIds::SUCCESS, + Ydb::StatusIds::NOT_FOUND + }); + } + }); + } + Y_UNIT_TEST(ShouldSucceedOnIndexedTable) { ShouldSucceed(R"( columns { diff --git a/ydb/core/wrappers/ut_helpers/s3_mock.h b/ydb/core/wrappers/ut_helpers/s3_mock.h index 8f7fa0e7275c..56776b11141a 100644 --- a/ydb/core/wrappers/ut_helpers/s3_mock.h +++ b/ydb/core/wrappers/ut_helpers/s3_mock.h @@ -71,6 +71,7 @@ class TS3Mock: public THttpServer::ICallBack { const char* GetError(); const THashMap& GetData() const { return Data; } + THashMap& GetData() { return Data; } private: const TSettings Settings; diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema index 96f21d41014d..330d6a02d418 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema @@ -951,6 +951,11 @@ 1 ], "ColumnsAdded": [ + { + "ColumnId": 7, + "ColumnName": "ChecksumState", + "ColumnType": "String" + }, { "ColumnId": 1, "ColumnName": "TxId", @@ -986,6 +991,7 @@ "ColumnFamilies": { "0": { "Columns": [ + 7, 1, 2, 3,