Skip to content

Commit

Permalink
kcat fixes (ydb-platform#1586)
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev committed Mar 1, 2024
1 parent b7a3970 commit bf2ea8e
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 21 deletions.
28 changes: 14 additions & 14 deletions ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,21 @@ TApiVersionsResponseData::TPtr GetApiVersions() {
TApiVersionsResponseData::TPtr response = std::make_shared<TApiVersionsResponseData>();
response->ErrorCode = EKafkaErrors::NONE_ERROR;

AddApiKey<TProduceRequestData>(response->ApiKeys, PRODUCE, {.MinVersion=3});
AddApiKey<TApiVersionsRequestData>(response->ApiKeys, API_VERSIONS);
AddApiKey<TMetadataRequestData>(response->ApiKeys, METADATA);
AddApiKey<TInitProducerIdRequestData>(response->ApiKeys, INIT_PRODUCER_ID);
AddApiKey<TSaslHandshakeRequestData>(response->ApiKeys, SASL_HANDSHAKE);
AddApiKey<TSaslAuthenticateRequestData>(response->ApiKeys, SASL_AUTHENTICATE);
AddApiKey<TListOffsetsRequestData>(response->ApiKeys, LIST_OFFSETS);
AddApiKey<TProduceRequestData>(response->ApiKeys, PRODUCE, {.MinVersion=3, .MaxVersion=9});
AddApiKey<TApiVersionsRequestData>(response->ApiKeys, API_VERSIONS, {.MaxVersion=2});
AddApiKey<TMetadataRequestData>(response->ApiKeys, METADATA, {.MaxVersion=9});
AddApiKey<TInitProducerIdRequestData>(response->ApiKeys, INIT_PRODUCER_ID, {.MaxVersion=4});
AddApiKey<TSaslHandshakeRequestData>(response->ApiKeys, SASL_HANDSHAKE, {.MaxVersion=1});
AddApiKey<TSaslAuthenticateRequestData>(response->ApiKeys, SASL_AUTHENTICATE, {.MaxVersion=2});
AddApiKey<TListOffsetsRequestData>(response->ApiKeys, LIST_OFFSETS, {.MinVersion=1, .MaxVersion=1});
AddApiKey<TFetchRequestData>(response->ApiKeys, FETCH, {.MaxVersion=3});
AddApiKey<TJoinGroupRequestData>(response->ApiKeys, JOIN_GROUP);
AddApiKey<TSyncGroupRequestData>(response->ApiKeys, SYNC_GROUP);
AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP);
AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT);
AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR);
AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=1});
AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH);
AddApiKey<TJoinGroupRequestData>(response->ApiKeys, JOIN_GROUP, {.MaxVersion=9});
AddApiKey<TSyncGroupRequestData>(response->ApiKeys, SYNC_GROUP, {.MaxVersion=3});
AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP, {.MaxVersion=5});
AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT, {.MaxVersion=4});
AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR, {.MaxVersion=0});
AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=0});
AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH, {.MaxVersion=8});

return response;
}
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ void TKafkaFindCoordinatorActor::SendResponseOkAndDie(const TString& host, i32 p
response->Coordinators.push_back(coordinator);
}

response->ErrorCode = NONE_ERROR;
response->Host = host;
response->Port = port;
response->NodeId = nodeId;

Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
Die(ctx);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse()
partition.CommittedOffset = sourcePartition.CommittedOffset;
partition.PartitionIndex = sourcePartition.PartitionIndex;
partition.ErrorCode = sourcePartition.ErrorCode;
topic.Partitions.push_back(partition);
}
response->Topics.push_back(topic);
}
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ void TKafkaReadSessionActor::HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr

switch (ReadStep) {
case WAIT_JOIN_GROUP: { // join first time
if (joinGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType);
if (joinGroupRequest->ProtocolType.has_value() && !joinGroupRequest->ProtocolType.value().empty() && joinGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType.value());
CloseReadSession(ctx);
return;
}
Expand Down Expand Up @@ -156,8 +156,8 @@ void TKafkaReadSessionActor::HandleSyncGroup(TEvKafka::TEvSyncGroupRequest::TPtr
return;
}

if (syncGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType);
if (syncGroupRequest->ProtocolType.has_value() && !syncGroupRequest->ProtocolType.value().empty() && syncGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType.value());
CloseReadSession(ctx);
return;
}
Expand Down Expand Up @@ -361,9 +361,9 @@ TConsumerProtocolAssignment TKafkaReadSessionActor::BuildAssignmentAndInformBala
for (auto part: finalPartitionsToRead) {
KAFKA_LOG_D("SYNC_GROUP assigned partition number: " << part);
topicPartition.Partitions.push_back(part);
assignment.AssignedPartitions.push_back(topicPartition);
partitions.ReadingNow.emplace(part);
}
assignment.AssignedPartitions.push_back(topicPartition);
}

return assignment;
Expand Down
10 changes: 9 additions & 1 deletion ydb/core/kafka_proxy/kafka_consumer_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,20 @@ void TConsumerProtocolAssignment::Read(TKafkaReadable& _readable, TKafkaVersion
}

void TConsumerProtocolAssignment::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
auto useVarintSize = _version > 3;
_version = ASSIGNMENT_VERSION;

if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
ythrow yexception() << "Can't write version " << _version << " of TConsumerProtocolAssignment";
}

_writable.writeUnsignedVarint(Size(ASSIGNMENT_VERSION) + 1);
if (useVarintSize) {
_writable.writeUnsignedVarint(Size(_version) + 1);
} else {
TKafkaInt32 size = Size(_version);
_writable << size;
}

_writable << _version;
NPrivate::TWriteCollector _collector;
NPrivate::Write<AssignedPartitionsMeta>(_collector, _writable, _version, AssignedPartitions);
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/kafka_proxy/kafka_messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4517,7 +4517,12 @@ i32 TSyncGroupResponseData::Size(TKafkaVersion _version) const {
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
_collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
}
return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1);
auto useVarintSize = _version > 3;
if (useVarintSize) {
return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1);
} else {
return _collector.Size + sizeof(TKafkaInt32);
}
}


Expand Down

0 comments on commit bf2ea8e

Please sign in to comment.