diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp index b6d537aed641..01cee28a259c 100644 --- a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp @@ -26,29 +26,32 @@ struct PartitionOffsets { }; class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor, public NKikimr::NGRpcProxy::V1::TDescribeTopicActorImpl { using TBase = NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor; public: TTopicOffsetActor(std::shared_ptr> consumers, - const NKikimr::NGRpcProxy::V1::TGetPartitionsLocationRequest& request, + const NKikimr::NGRpcProxy::V1::TLocalRequestBase& request, const TActorId& requester, - std::shared_ptr> partitions) + std::shared_ptr> partitions, + const TString& originalTopicName, + const TString& userSID) : TBase(request, requester) , TDescribeTopicActorImpl(ConsumerOffsetSettings(consumers, partitions)) , Requester(requester) - , TopicName(request.Topic) + , OriginalTopicName(originalTopicName) + , UserSID(userSID) { - Y_UNUSED(requester); }; void Bootstrap(const NActors::TActorContext& ctx) override { Y_UNUSED(ctx); + KAFKA_LOG_D("TopicOffsetActor: Get commited offsets for topic '" << OriginalTopicName + << "' for user '" << UserSID << "'"); SendDescribeProposeRequest(); Become(&TTopicOffsetActor::StateWork); }; @@ -67,14 +70,18 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor< const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) override { - Y_UNUSED(error); - Y_UNUSED(errorCode); - Y_UNUSED(status); Y_UNUSED(ctx); + KAFKA_LOG_D("TopicOffsetActor: error raised for '" << OriginalTopicName << "'" + << " for user '" << UserSID << "'." + << " Error: '" << error << "'," + << " ErrorCode: '" << static_cast(errorCode) << "'," + << " StatusCode: '" << status<< "'."); + THolder response(new TEvKafka::TEvCommitedOffsetsResponse()); - response->TopicName = TopicName; - response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::ERROR; + response->TopicName = OriginalTopicName; + response->Status = ConvertErrorCode(status); + Send(Requester, response.Release()); Die(ctx); } @@ -114,10 +121,12 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor< void HandleCacheNavigateResponse(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override { const auto& response = ev->Get()->Request.Get()->ResultSet.front(); + KAFKA_LOG_D("TopicOffsetActor: TEvNavigateKeySetResult recieved for topic '" << OriginalTopicName + << "' for user '" << UserSID << "'. PQGroupInfo is present: " << (response.PQGroupInfo.Get() != nullptr)); if (!response.PQGroupInfo) { THolder response(new TEvKafka::TEvCommitedOffsetsResponse()); - response->TopicName = TopicName; - response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::UNKNOWN_TOPIC; + response->TopicName = OriginalTopicName; + response->Status = UNKNOWN_TOPIC_OR_PARTITION; Send(Requester, response.Release()); TActorBootstrapped::PassAway(); return; @@ -128,17 +137,20 @@ class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor< } void Reply(const TActorContext& ctx) override { + KAFKA_LOG_D("TopicOffsetActor: replying for topic '" << OriginalTopicName + << "' for user '" << UserSID << "'" << " with status NONE_ERROR"); THolder response(new TEvKafka::TEvCommitedOffsetsResponse()); - response->TopicName = TopicName; - response->Status = TEvKafka::TEvCommitedOffsetsResponse::EStatus::OK; + response->TopicName = OriginalTopicName; + response->Status = NONE_ERROR; response->PartitionIdToOffsets = PartitionIdToOffsets; Send(Requester, response.Release()); Die(ctx); }; private: - TActorId Requester; - TString TopicName; + const TActorId Requester; + const TString OriginalTopicName; + const TString UserSID; std::unordered_map PartitionIdToOffset {}; std::shared_ptr>> PartitionIdToOffsets = std::make_shared>>(); }; @@ -156,37 +168,28 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse() TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics topic; TString topicName = requestTopic.Name.value(); topic.Name = topicName; - if (UnknownTopics.contains(topicName)) { + if (TopicsToResponses[topicName]->Status == NONE_ERROR) { + auto partitionsToOffsets = TopicsToResponses[topicName]->PartitionIdToOffsets; for (auto requestPartition: requestTopic.PartitionIndexes) { TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition; partition.PartitionIndex = requestPartition; - partition.ErrorCode = UNKNOWN_TOPIC_OR_PARTITION; + if (partitionsToOffsets.get() != nullptr + && partitionsToOffsets->contains(requestPartition) + && (*partitionsToOffsets)[requestPartition].contains(requestGroup.GroupId.value())) { + partition.CommittedOffset = (*partitionsToOffsets)[requestPartition][requestGroup.GroupId.value()]; + partition.ErrorCode = NONE_ERROR; + } else { + partition.ErrorCode = RESOURCE_NOT_FOUND; + } topic.Partitions.push_back(partition); } - group.Topics.push_back(topic); - continue; - } - if (ErroredTopics.contains(topicName)) { + } else { for (auto requestPartition: requestTopic.PartitionIndexes) { TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition; partition.PartitionIndex = requestPartition; - partition.ErrorCode = UNKNOWN_SERVER_ERROR; + partition.ErrorCode = TopicsToResponses[topicName]->Status; topic.Partitions.push_back(partition); } - group.Topics.push_back(topic); - continue; - } - auto partitionsToOffsets = TopicToOffsets[topicName]; - for (auto requestPartition: requestTopic.PartitionIndexes) { - TOffsetFetchResponseData::TOffsetFetchResponseGroup::TOffsetFetchResponseTopics::TOffsetFetchResponsePartitions partition; - partition.PartitionIndex = requestPartition; - if (partitionsToOffsets->contains(requestPartition) && (*partitionsToOffsets)[requestPartition].contains(requestGroup.GroupId.value())) { - partition.CommittedOffset = (*partitionsToOffsets)[requestPartition][requestGroup.GroupId.value()]; - partition.ErrorCode = NONE_ERROR; - } else { - partition.ErrorCode = RESOURCE_NOT_FOUND; - } - topic.Partitions.push_back(partition); } group.Topics.push_back(topic); } @@ -210,6 +213,7 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse() void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) { // If API level <= 7, Groups would be empty. In this case we convert message to level 8 and process it uniformely later + KAFKA_LOG_D("TopicOffsetActor: new request for user '" << Context->UserToken->GetUserSID()<< "'"); if (Message->Groups.empty()) { TOffsetFetchRequestData::TOffsetFetchRequestGroup group; group.GroupId = Message->GroupId.value(); @@ -230,15 +234,18 @@ void TKafkaOffsetFetchActor::Bootstrap(const NActors::TActorContext& ctx) { } for (const auto& topicToEntities : TopicToEntities) { - NKikimr::NGRpcProxy::V1::TGetPartitionsLocationRequest locationRequest{}; - locationRequest.Topic = topicToEntities.first; - locationRequest.Token = Context->UserToken->GetSerializedToken(); - locationRequest.Database = Context->DatabasePath; + NKikimr::NGRpcProxy::V1::TLocalRequestBase locationRequest{ + NormalizePath(Context->DatabasePath, topicToEntities.first), + Context->DatabasePath, + Context->UserToken->GetSerializedToken(), + }; ctx.Register(new TTopicOffsetActor( topicToEntities.second.Consumers, locationRequest, SelfId(), - topicToEntities.second.Partitions + topicToEntities.second.Partitions, + topicToEntities.first, + Context->UserToken->GetUserSID() )); InflyTopics++; } @@ -261,14 +268,13 @@ void TKafkaOffsetFetchActor::ExtractPartitions(const TString& group, const NKafk void TKafkaOffsetFetchActor::Handle(TEvKafka::TEvCommitedOffsetsResponse::TPtr& ev, const TActorContext& ctx) { InflyTopics--; - TopicToOffsets[ev->Get()->TopicName] = ev->Get()->PartitionIdToOffsets; - if (ev->Get()->Status == TEvKafka::TEvCommitedOffsetsResponse::ERROR) { - ErroredTopics.insert(ev->Get()->TopicName); - } else if (ev->Get()->Status == TEvKafka::TEvCommitedOffsetsResponse::UNKNOWN_TOPIC) { - UnknownTopics.insert(ev->Get()->TopicName); - } + + auto eventPtr = ev->Release(); + TopicsToResponses[eventPtr->TopicName] = eventPtr; + if (InflyTopics == 0) { auto response = GetOffsetFetchResponse(); + KAFKA_LOG_D("TopicOffsetActor: sending response to user '" << Context->UserToken->GetUserSID()<< "'"); Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast(response->ErrorCode))); Die(ctx); } diff --git a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h index ec7f1075df7f..f0d1828dcb71 100644 --- a/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.h @@ -36,9 +36,7 @@ class TKafkaOffsetFetchActor: public NActors::TActorBootstrapped Message; std::unordered_map TopicToEntities; - std::unordered_map>>> TopicToOffsets; - std::set UnknownTopics; - std::set ErroredTopics; + std::unordered_map> TopicsToResponses; ui32 InflyTopics = 0; }; diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h index c649a2bd3812..ff7126ecee8b 100644 --- a/ydb/core/kafka_proxy/kafka_events.h +++ b/ydb/core/kafka_proxy/kafka_events.h @@ -211,17 +211,11 @@ struct TEvTopicOffsetsResponse : public NActors::TEventLocal , public NKikimr::NGRpcProxy::V1::TEvPQProxy::TLocalResponseBase { - enum EStatus { - OK, - ERROR, - UNKNOWN_TOPIC, - }; - TEvCommitedOffsetsResponse() {} TString TopicName; - EStatus Status; + EKafkaErrors Status; std::shared_ptr>> PartitionIdToOffsets; }; diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index f01d6951a469..94119c907795 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -1365,6 +1365,20 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 1); } + { + // Check with short topic name + std::map> topicsToPartions; + topicsToPartions[shortTopicName] = std::vector{0, 1, 2, 3}; + auto msg = client.OffsetFetch(consumerName, topicsToPartions); + UNIT_ASSERT_VALUES_EQUAL(msg->Groups.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Groups[0].Topics.size(), 1); + const auto& partitions = msg->Groups[0].Topics[0].Partitions; + UNIT_ASSERT_VALUES_EQUAL(partitions.size(), 4); + auto partition0 = std::find_if(partitions.begin(), partitions.end(), [](const auto& partition) { return partition.PartitionIndex == 0; }); + UNIT_ASSERT_VALUES_UNEQUAL(partition0, partitions.end()); + UNIT_ASSERT_VALUES_EQUAL(partition0->ErrorCode, NONE_ERROR); + UNIT_ASSERT_VALUES_EQUAL(partition0->CommittedOffset, 1); + } { // Check with nonexistent topic std::map> topicsToPartions;