Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 27 additions & 16 deletions ydb/core/tx/replication/service/transfer_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ struct TOutputType {

class TMessageOutputSpec : public NYql::NPureCalc::TOutputSpecBase {
public:
explicit TMessageOutputSpec(const TVector<TSchemaColumn>& tableColumns, const NYT::TNode& schema)
: TableColumns(tableColumns)
explicit TMessageOutputSpec(const TScheme& tableScheme, const NYT::TNode& schema)
: TableScheme(tableScheme)
, Schema(schema)
{}

Expand All @@ -69,12 +69,12 @@ class TMessageOutputSpec : public NYql::NPureCalc::TOutputSpecBase {
return Schema;
}

const TVector<TSchemaColumn> GetTableColumns() const {
return TableColumns;
const TVector<NKikimrKqp::TKqpColumnMetadataProto>& GetTableColumns() const {
return TableScheme.ColumnsMetadata;
}

private:
const TVector<TSchemaColumn> TableColumns;
const TScheme TableScheme;
const NYT::TNode Schema;
};

Expand Down Expand Up @@ -105,6 +105,15 @@ class TOutputListImpl final: public IStream<TOutputType*> {
}

Out.Value = value.GetElement(0);

const auto& columns = OutputSpec.GetTableColumns();
for (size_t i = 0; i < columns.size(); ++i) {
const auto& column = columns[i];
if (column.GetNotNull() && !Out.Value.GetElement(i)) {
throw yexception() << "The value of the '" << column.GetName() << "' column must be non-NULL";
}
}

Out.Data.PushRow(&Out.Value, 1);

return &Out;
Expand Down Expand Up @@ -191,11 +200,11 @@ class TProgramHolder : public NFq::IProgramHolder {

public:
TProgramHolder(
const TVector<TSchemaColumn>& tableColumns,
const TScheme& tableScheme,
const TString& sql
)
: TopicColumns()
, TableColumns(tableColumns)
, TableScheme(tableScheme)
, Sql(sql)
{}

Expand All @@ -205,7 +214,7 @@ class TProgramHolder : public NFq::IProgramHolder {
// allocated on another allocator and should be released
Program = programFactory->MakePullListProgram(
NYdb::NTopic::NPurecalc::TMessageInputSpec(),
TMessageOutputSpec(TableColumns, MakeOutputSchema(TableColumns)),
TMessageOutputSpec(TableScheme, MakeOutputSchema(TableScheme.TableColumns)),
Sql,
NYql::NPureCalc::ETranslationMode::SQL
);
Expand All @@ -217,7 +226,7 @@ class TProgramHolder : public NFq::IProgramHolder {

private:
const TVector<TSchemaColumn> TopicColumns;
const TVector<TSchemaColumn> TableColumns;
const TScheme TableScheme;
const TString Sql;

THolder<NYql::NPureCalc::TPullListProgram<NYdb::NTopic::NPurecalc::TMessageInputSpec, TMessageOutputSpec>> Program;
Expand All @@ -239,10 +248,11 @@ TScheme BuildScheme(const TAutoPtr<NSchemeCache::TSchemeCacheNavigate>& nav) {
result.TableColumns.resize(keyColumns);

for (const auto& [_, column] : entry.Columns) {
auto notNull = entry.NotNullColumns.contains(column.Name);
if (column.KeyOrder >= 0) {
result.TableColumns[column.KeyOrder] = {column.Name, column.Id, column.PType, column.KeyOrder >= 0, !column.IsNotNullColumn};
result.TableColumns[column.KeyOrder] = {column.Name, column.Id, column.PType, column.KeyOrder >= 0, !notNull};
} else {
result.TableColumns.emplace_back(column.Name, column.Id, column.PType, column.KeyOrder >= 0, !column.IsNotNullColumn);
result.TableColumns.emplace_back(column.Name, column.Id, column.PType, column.KeyOrder >= 0, !notNull);
}
}

Expand All @@ -260,6 +270,7 @@ TScheme BuildScheme(const TAutoPtr<NSchemeCache::TSchemeCacheNavigate>& nav) {
c.SetName(column.Name);
c.SetId(column.Id);
c.SetTypeId(column.PType.GetTypeId());
c.SetNotNull(entry.NotNullColumns.contains(column.Name));

if (NScheme::NTypeIds::IsParametrizedType(column.PType.GetTypeId())) {
NScheme::ProtoFromTypeInfo(column.PType, "", *c.MutableTypeInfo());
Expand Down Expand Up @@ -299,8 +310,8 @@ class ITableKindState {

virtual TString Handle(TEvents::TEvCompleted::TPtr& ev) = 0;

const TVector<TSchemaColumn>& GetTableColumns() const {
return Scheme.TableColumns;
const TScheme& GetScheme() const {
return Scheme;
}

protected:
Expand Down Expand Up @@ -521,7 +532,7 @@ class TTransferWriter
LOG_D("CompileTransferLambda: worker# " << Worker);

NFq::TPurecalcCompileSettings settings = {};
auto programHolder = MakeIntrusive<TProgramHolder>(TableState->GetTableColumns(), GenerateSql());
auto programHolder = MakeIntrusive<TProgramHolder>(TableState->GetScheme(), GenerateSql());
auto result = std::make_unique<NFq::TEvRowDispatcher::TEvPurecalcCompileRequest>(std::move(programHolder), settings);

Send(CompileServiceId, result.release(), 0, ++InFlightCompilationId);
Expand Down Expand Up @@ -652,12 +663,12 @@ class TTransferWriter
}
} catch (const yexception& e) {
ProcessingErrorStatus = TEvWorker::TEvGone::EStatus::SCHEME_ERROR;
ProcessingError = TStringBuilder() << "Error transform message: " << e.what();
ProcessingError = TStringBuilder() << "Error transform message partition " << partitionId << " offset " << message.GetOffset() << ": " << e.what();
break;
}
}

if (TableState->BatchSize() >= BatchSizeBytes || *LastWriteTime < TInstant::Now() - FlushInterval) {
if (!ProcessingError && (TableState->BatchSize() >= BatchSizeBytes || *LastWriteTime < TInstant::Now() - FlushInterval)) {
if (TableState->Flush()) {
LastWriteTime.reset();
return Become(&TThis::StateWrite);
Expand Down
108 changes: 108 additions & 0 deletions ydb/tests/functional/replication/transfer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,74 @@ Y_UNIT_TEST_SUITE(Transfer)
});
}

Y_UNIT_TEST(NullToKeyColumn)
{
MainTestCase testCase;

testCase.CreateTable(R"(
CREATE TABLE `%s` (
Key Uint64 NOT NULL,
Message Utf8,
PRIMARY KEY (Key)
) WITH (
STORE = COLUMN
);
)");
testCase.CreateTopic(1);
testCase.CreateTransfer(R"(
$l = ($x) -> {
return [
<|
Key:NULL,
Message:CAST($x._data AS Utf8)
|>
];
};
)");

testCase.Write({"Message-1"});

testCase.CheckTransferStateError("Error transform message partition 0 offset 0: The value of the 'Key' column must be non-NULL");

testCase.DropTransfer();
testCase.DropTable();
testCase.DropTopic();
}

Y_UNIT_TEST(NullToColumn)
{
MainTestCase testCase;

testCase.CreateTable(R"(
CREATE TABLE `%s` (
Key Uint64 NOT NULL,
Message Utf8 NOT NULL,
PRIMARY KEY (Key)
) WITH (
STORE = COLUMN
);
)");
testCase.CreateTopic(1);
testCase.CreateTransfer(R"(
$l = ($x) -> {
return [
<|
Key:$x._offset,
Message:NULL
|>
];
};
)");

testCase.Write({"Message-1"});

testCase.CheckTransferStateError("Error transform message partition 0 offset 0: The value of the 'Message' column must be non-NULL");

testCase.DropTransfer();
testCase.DropTable();
testCase.DropTopic();
}

Y_UNIT_TEST(DropTransfer)
{
MainTestCase testCase;
Expand Down Expand Up @@ -878,6 +946,46 @@ Y_UNIT_TEST_SUITE(Transfer)
testCase.DropTable();
}

Y_UNIT_TEST(TransferSourceDropped)
{
MainTestCase testCase;
testCase.CreateTable(R"(
CREATE TABLE `%s` (
Key Uint64 NOT NULL,
Message Utf8,
PRIMARY KEY (Key)
) WITH (
STORE = COLUMN
);
)");

testCase.CreateTopic(1);

testCase.CreateTransfer(R"(
$l = ($x) -> {
return [
<|
Key:CAST($x._offset AS Uint64),
Message:CAST($x._data AS Utf8)
|>
];
};
)");

testCase.Write({"Message-1"});

testCase.CheckResult({{
_C("Message", TString("Message-1"))
}});

testCase.DropTopic();

testCase.CheckTransferStateError("Discovery for all topics failed. The last error was: no path 'local/Topic_");

testCase.DropTransfer();
testCase.DropTable();
}

Y_UNIT_TEST(CreateTransferSourceIsNotTopic)
{
MainTestCase testCase;
Expand Down
Loading