diff --git a/ydb/core/kafka_proxy/kafka_records.h b/ydb/core/kafka_proxy/kafka_records.h index 9afa9c07124b..e94c6888affd 100644 --- a/ydb/core/kafka_proxy/kafka_records.h +++ b/ydb/core/kafka_proxy/kafka_records.h @@ -165,7 +165,7 @@ class TKafkaRecord: public TMessage { static constexpr TKafkaVersions PresentVersions = VersionsAlways; static constexpr TKafkaVersions TaggedVersions = VersionsNever; - static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; static constexpr TKafkaVersions FlexibleVersions = VersionsAlways; }; ValueMeta::Type Value; diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 64d1b7ab3c79..23d144090569 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -393,6 +393,24 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { AssertMessageMeta(readMessage, headerKey, headerValue); } + // send empty produce message + { + TKafkaRecordBatch batch; + batch.BaseOffset = 3; + batch.BaseSequence = 5; + batch.Magic = 2; // Current supported + batch.Records.resize(1); + batch.Records[0].Key = TKafkaBytes{}; + batch.Records[0].Value = TKafkaBytes{}; + + auto msg = client.Produce(topicName, 0, batch); + + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, topicName); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode, + static_cast(EKafkaErrors::NONE_ERROR)); + } + { // Check short topic name