diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp index 96c628477a5c..dc74d519630a 100644 --- a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp @@ -30,21 +30,21 @@ TApiVersionsResponseData::TPtr GetApiVersions() { TApiVersionsResponseData::TPtr response = std::make_shared(); response->ErrorCode = EKafkaErrors::NONE_ERROR; - AddApiKey(response->ApiKeys, PRODUCE, {.MinVersion=3}); - AddApiKey(response->ApiKeys, API_VERSIONS); - AddApiKey(response->ApiKeys, METADATA); - AddApiKey(response->ApiKeys, INIT_PRODUCER_ID); - AddApiKey(response->ApiKeys, SASL_HANDSHAKE); - AddApiKey(response->ApiKeys, SASL_AUTHENTICATE); - AddApiKey(response->ApiKeys, LIST_OFFSETS); + AddApiKey(response->ApiKeys, PRODUCE, {.MinVersion=3, .MaxVersion=9}); + AddApiKey(response->ApiKeys, API_VERSIONS, {.MaxVersion=2}); + AddApiKey(response->ApiKeys, METADATA, {.MaxVersion=9}); + AddApiKey(response->ApiKeys, INIT_PRODUCER_ID, {.MaxVersion=4}); + AddApiKey(response->ApiKeys, SASL_HANDSHAKE, {.MaxVersion=1}); + AddApiKey(response->ApiKeys, SASL_AUTHENTICATE, {.MaxVersion=2}); + AddApiKey(response->ApiKeys, LIST_OFFSETS, {.MinVersion=1, .MaxVersion=1}); AddApiKey(response->ApiKeys, FETCH, {.MaxVersion=3}); - AddApiKey(response->ApiKeys, JOIN_GROUP); - AddApiKey(response->ApiKeys, SYNC_GROUP); - AddApiKey(response->ApiKeys, LEAVE_GROUP); - AddApiKey(response->ApiKeys, HEARTBEAT); - AddApiKey(response->ApiKeys, FIND_COORDINATOR); - AddApiKey(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=1}); - AddApiKey(response->ApiKeys, OFFSET_FETCH); + AddApiKey(response->ApiKeys, JOIN_GROUP, {.MaxVersion=9}); + AddApiKey(response->ApiKeys, SYNC_GROUP, {.MaxVersion=3}); + AddApiKey(response->ApiKeys, LEAVE_GROUP, {.MaxVersion=5}); + AddApiKey(response->ApiKeys, HEARTBEAT, {.MaxVersion=4}); + AddApiKey(response->ApiKeys, FIND_COORDINATOR, {.MaxVersion=0}); + AddApiKey(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=0}); + AddApiKey(response->ApiKeys, OFFSET_FETCH, {.MaxVersion=8}); return response; } diff --git a/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp index 63709d8385c4..c1f473f59f92 100644 --- a/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp @@ -49,6 +49,11 @@ void TKafkaFindCoordinatorActor::SendResponseOkAndDie(const TString& host, i32 p response->Coordinators.push_back(coordinator); } + response->ErrorCode = NONE_ERROR; + response->Host = host; + response->Port = port; + response->NodeId = nodeId; + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast(response->ErrorCode))); Die(ctx); } diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp index 01cee28a259c..96893aaa59ce 100644 --- a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp @@ -204,6 +204,7 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse() partition.CommittedOffset = sourcePartition.CommittedOffset; partition.PartitionIndex = sourcePartition.PartitionIndex; partition.ErrorCode = sourcePartition.ErrorCode; + topic.Partitions.push_back(partition); } response->Topics.push_back(topic); } diff --git a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp index e788f7ceb736..e8f0a2024501 100644 --- a/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp @@ -86,8 +86,8 @@ void TKafkaReadSessionActor::HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr switch (ReadStep) { case WAIT_JOIN_GROUP: { // join first time - if (joinGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) { - SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType); + if (joinGroupRequest->ProtocolType.has_value() && !joinGroupRequest->ProtocolType.value().empty() && joinGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) { + SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType.value()); CloseReadSession(ctx); return; } @@ -156,8 +156,8 @@ void TKafkaReadSessionActor::HandleSyncGroup(TEvKafka::TEvSyncGroupRequest::TPtr return; } - if (syncGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) { - SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType); + if (syncGroupRequest->ProtocolType.has_value() && !syncGroupRequest->ProtocolType.value().empty() && syncGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) { + SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType.value()); CloseReadSession(ctx); return; } @@ -361,9 +361,9 @@ TConsumerProtocolAssignment TKafkaReadSessionActor::BuildAssignmentAndInformBala for (auto part: finalPartitionsToRead) { KAFKA_LOG_D("SYNC_GROUP assigned partition number: " << part); topicPartition.Partitions.push_back(part); - assignment.AssignedPartitions.push_back(topicPartition); partitions.ReadingNow.emplace(part); } + assignment.AssignedPartitions.push_back(topicPartition); } return assignment; diff --git a/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp b/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp index dd2e52eb7a64..ce54125ffd56 100644 --- a/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp +++ b/ydb/core/kafka_proxy/kafka_consumer_protocol.cpp @@ -143,12 +143,20 @@ void TConsumerProtocolAssignment::Read(TKafkaReadable& _readable, TKafkaVersion } void TConsumerProtocolAssignment::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + auto useVarintSize = _version > 3; _version = ASSIGNMENT_VERSION; + if (!NPrivate::VersionCheck(_version)) { ythrow yexception() << "Can't write version " << _version << " of TConsumerProtocolAssignment"; } - _writable.writeUnsignedVarint(Size(ASSIGNMENT_VERSION) + 1); + if (useVarintSize) { + _writable.writeUnsignedVarint(Size(_version) + 1); + } else { + TKafkaInt32 size = Size(_version); + _writable << size; + } + _writable << _version; NPrivate::TWriteCollector _collector; NPrivate::Write(_collector, _writable, _version, AssignedPartitions); diff --git a/ydb/core/kafka_proxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp index 8b08d98ce472..b68f3cd9ba77 100644 --- a/ydb/core/kafka_proxy/kafka_messages.cpp +++ b/ydb/core/kafka_proxy/kafka_messages.cpp @@ -4517,7 +4517,12 @@ i32 TSyncGroupResponseData::Size(TKafkaVersion _version) const { if (NPrivate::VersionCheck(_version)) { _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); } - return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1); + auto useVarintSize = _version > 3; + if (useVarintSize) { + return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1); + } else { + return _collector.Size + sizeof(TKafkaInt32); + } }