diff --git a/ydb/core/tx/replication/service/transfer_writer.cpp b/ydb/core/tx/replication/service/transfer_writer.cpp index f1d8f00016dd..b8c53276c24c 100644 --- a/ydb/core/tx/replication/service/transfer_writer.cpp +++ b/ydb/core/tx/replication/service/transfer_writer.cpp @@ -59,8 +59,8 @@ struct TOutputType { class TMessageOutputSpec : public NYql::NPureCalc::TOutputSpecBase { public: - explicit TMessageOutputSpec(const TVector& tableColumns, const NYT::TNode& schema) - : TableColumns(tableColumns) + explicit TMessageOutputSpec(const TScheme& tableScheme, const NYT::TNode& schema) + : TableScheme(tableScheme) , Schema(schema) {} @@ -69,12 +69,12 @@ class TMessageOutputSpec : public NYql::NPureCalc::TOutputSpecBase { return Schema; } - const TVector GetTableColumns() const { - return TableColumns; + const TVector& GetTableColumns() const { + return TableScheme.ColumnsMetadata; } private: - const TVector TableColumns; + const TScheme TableScheme; const NYT::TNode Schema; }; @@ -105,6 +105,15 @@ class TOutputListImpl final: public IStream { } Out.Value = value.GetElement(0); + + const auto& columns = OutputSpec.GetTableColumns(); + for (size_t i = 0; i < columns.size(); ++i) { + const auto& column = columns[i]; + if (column.GetNotNull() && !Out.Value.GetElement(i)) { + throw yexception() << "The value of the '" << column.GetName() << "' column must be non-NULL"; + } + } + Out.Data.PushRow(&Out.Value, 1); return &Out; @@ -191,11 +200,11 @@ class TProgramHolder : public NFq::IProgramHolder { public: TProgramHolder( - const TVector& tableColumns, + const TScheme& tableScheme, const TString& sql ) : TopicColumns() - , TableColumns(tableColumns) + , TableScheme(tableScheme) , Sql(sql) {} @@ -205,7 +214,7 @@ class TProgramHolder : public NFq::IProgramHolder { // allocated on another allocator and should be released Program = programFactory->MakePullListProgram( NYdb::NTopic::NPurecalc::TMessageInputSpec(), - TMessageOutputSpec(TableColumns, MakeOutputSchema(TableColumns)), + TMessageOutputSpec(TableScheme, MakeOutputSchema(TableScheme.TableColumns)), Sql, NYql::NPureCalc::ETranslationMode::SQL ); @@ -217,7 +226,7 @@ class TProgramHolder : public NFq::IProgramHolder { private: const TVector TopicColumns; - const TVector TableColumns; + const TScheme TableScheme; const TString Sql; THolder> Program; @@ -239,10 +248,11 @@ TScheme BuildScheme(const TAutoPtr& nav) { result.TableColumns.resize(keyColumns); for (const auto& [_, column] : entry.Columns) { + auto notNull = entry.NotNullColumns.contains(column.Name); if (column.KeyOrder >= 0) { - result.TableColumns[column.KeyOrder] = {column.Name, column.Id, column.PType, column.KeyOrder >= 0, !column.IsNotNullColumn}; + result.TableColumns[column.KeyOrder] = {column.Name, column.Id, column.PType, column.KeyOrder >= 0, !notNull}; } else { - result.TableColumns.emplace_back(column.Name, column.Id, column.PType, column.KeyOrder >= 0, !column.IsNotNullColumn); + result.TableColumns.emplace_back(column.Name, column.Id, column.PType, column.KeyOrder >= 0, !notNull); } } @@ -260,6 +270,7 @@ TScheme BuildScheme(const TAutoPtr& nav) { c.SetName(column.Name); c.SetId(column.Id); c.SetTypeId(column.PType.GetTypeId()); + c.SetNotNull(entry.NotNullColumns.contains(column.Name)); if (NScheme::NTypeIds::IsParametrizedType(column.PType.GetTypeId())) { NScheme::ProtoFromTypeInfo(column.PType, "", *c.MutableTypeInfo()); @@ -299,8 +310,8 @@ class ITableKindState { virtual TString Handle(TEvents::TEvCompleted::TPtr& ev) = 0; - const TVector& GetTableColumns() const { - return Scheme.TableColumns; + const TScheme& GetScheme() const { + return Scheme; } protected: @@ -521,7 +532,7 @@ class TTransferWriter LOG_D("CompileTransferLambda: worker# " << Worker); NFq::TPurecalcCompileSettings settings = {}; - auto programHolder = MakeIntrusive(TableState->GetTableColumns(), GenerateSql()); + auto programHolder = MakeIntrusive(TableState->GetScheme(), GenerateSql()); auto result = std::make_unique(std::move(programHolder), settings); Send(CompileServiceId, result.release(), 0, ++InFlightCompilationId); @@ -652,12 +663,12 @@ class TTransferWriter } } catch (const yexception& e) { ProcessingErrorStatus = TEvWorker::TEvGone::EStatus::SCHEME_ERROR; - ProcessingError = TStringBuilder() << "Error transform message: " << e.what(); + ProcessingError = TStringBuilder() << "Error transform message partition " << partitionId << " offset " << message.GetOffset() << ": " << e.what(); break; } } - if (TableState->BatchSize() >= BatchSizeBytes || *LastWriteTime < TInstant::Now() - FlushInterval) { + if (!ProcessingError && (TableState->BatchSize() >= BatchSizeBytes || *LastWriteTime < TInstant::Now() - FlushInterval)) { if (TableState->Flush()) { LastWriteTime.reset(); return Become(&TThis::StateWrite); diff --git a/ydb/tests/functional/replication/transfer.cpp b/ydb/tests/functional/replication/transfer.cpp index eb090031dfbd..7da37b308bfe 100644 --- a/ydb/tests/functional/replication/transfer.cpp +++ b/ydb/tests/functional/replication/transfer.cpp @@ -478,6 +478,74 @@ Y_UNIT_TEST_SUITE(Transfer) }); } + Y_UNIT_TEST(NullToKeyColumn) + { + MainTestCase testCase; + + testCase.CreateTable(R"( + CREATE TABLE `%s` ( + Key Uint64 NOT NULL, + Message Utf8, + PRIMARY KEY (Key) + ) WITH ( + STORE = COLUMN + ); + )"); + testCase.CreateTopic(1); + testCase.CreateTransfer(R"( + $l = ($x) -> { + return [ + <| + Key:NULL, + Message:CAST($x._data AS Utf8) + |> + ]; + }; + )"); + + testCase.Write({"Message-1"}); + + testCase.CheckTransferStateError("Error transform message partition 0 offset 0: The value of the 'Key' column must be non-NULL"); + + testCase.DropTransfer(); + testCase.DropTable(); + testCase.DropTopic(); + } + + Y_UNIT_TEST(NullToColumn) + { + MainTestCase testCase; + + testCase.CreateTable(R"( + CREATE TABLE `%s` ( + Key Uint64 NOT NULL, + Message Utf8 NOT NULL, + PRIMARY KEY (Key) + ) WITH ( + STORE = COLUMN + ); + )"); + testCase.CreateTopic(1); + testCase.CreateTransfer(R"( + $l = ($x) -> { + return [ + <| + Key:$x._offset, + Message:NULL + |> + ]; + }; + )"); + + testCase.Write({"Message-1"}); + + testCase.CheckTransferStateError("Error transform message partition 0 offset 0: The value of the 'Message' column must be non-NULL"); + + testCase.DropTransfer(); + testCase.DropTable(); + testCase.DropTopic(); + } + Y_UNIT_TEST(DropTransfer) { MainTestCase testCase; @@ -878,6 +946,46 @@ Y_UNIT_TEST_SUITE(Transfer) testCase.DropTable(); } + Y_UNIT_TEST(TransferSourceDropped) + { + MainTestCase testCase; + testCase.CreateTable(R"( + CREATE TABLE `%s` ( + Key Uint64 NOT NULL, + Message Utf8, + PRIMARY KEY (Key) + ) WITH ( + STORE = COLUMN + ); + )"); + + testCase.CreateTopic(1); + + testCase.CreateTransfer(R"( + $l = ($x) -> { + return [ + <| + Key:CAST($x._offset AS Uint64), + Message:CAST($x._data AS Utf8) + |> + ]; + }; + )"); + + testCase.Write({"Message-1"}); + + testCase.CheckResult({{ + _C("Message", TString("Message-1")) + }}); + + testCase.DropTopic(); + + testCase.CheckTransferStateError("Discovery for all topics failed. The last error was: no path 'local/Topic_"); + + testCase.DropTransfer(); + testCase.DropTable(); + } + Y_UNIT_TEST(CreateTransferSourceIsNotTopic) { MainTestCase testCase;