Skip to content

Commit

Permalink
Fix kafka with enabled proxy (ydb-platform#2017)
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev committed Mar 4, 2024
1 parent efd7138 commit 73047c9
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 7 deletions.
5 changes: 5 additions & 0 deletions ydb/core/kafka_proxy/actors/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@

namespace NKafka {

static constexpr int ProxyNodeId = 1;
static constexpr char UnderlayPrefix[] = "u-";

static_assert(sizeof(UnderlayPrefix) == 3);

enum EAuthSteps {
WAIT_HANDSHAKE,
WAIT_AUTH,
Expand Down
12 changes: 10 additions & 2 deletions ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void TKafkaFindCoordinatorActor::Bootstrap(const NActors::TActorContext& ctx) {

bool withProxy = Context->Config.HasProxy() && !Context->Config.GetProxy().GetHostname().Empty();
if (withProxy) {
SendResponseOkAndDie(Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort(), -1, ctx);
SendResponseOkAndDie(Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort(), NKafka::ProxyNodeId, ctx);
return;
}

Expand Down Expand Up @@ -54,6 +54,8 @@ void TKafkaFindCoordinatorActor::SendResponseOkAndDie(const TString& host, i32 p
response->Port = port;
response->NodeId = nodeId;

KAFKA_LOG_D("FIND_COORDINATOR response. Host#: " << host << ", Port#: " << port << ", NodeId# " << nodeId);

Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
Die(ctx);
}
Expand All @@ -71,15 +73,21 @@ void TKafkaFindCoordinatorActor::SendResponseFailAndDie(EKafkaErrors error, cons

response->Coordinators.push_back(coordinator);
}


response->ErrorCode = error;

Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
Die(ctx);
}

void TKafkaFindCoordinatorActor::Handle(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx) {
auto iter = ev->Get()->NodeIdsMapping->find(ctx.SelfID.NodeId());
Y_ABORT_UNLESS(!iter.IsEnd());

auto host = (*ev->Get()->Nodes)[iter->second].Host;
if (host.StartsWith(UnderlayPrefix)) {
host = host.substr(sizeof(UnderlayPrefix) - 1);
}
KAFKA_LOG_D("FIND_COORDINATOR incoming TEvGetAllNodesInfoResponse. Host#: " << host);
SendResponseOkAndDie(host, Context->Config.GetListeningPort(), ctx.SelfID.NodeId(), ctx);
}
Expand Down
5 changes: 0 additions & 5 deletions ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@
namespace NKafka {
using namespace NKikimr::NGRpcProxy::V1;

static constexpr int ProxyNodeId = 1;
static constexpr char UnderlayPrefix[] = "u-";

static_assert(sizeof(UnderlayPrefix) == 3);

NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context,
const ui64 correlationId,
const TMessagePtr<TMetadataRequestData>& message) {
Expand Down

0 comments on commit 73047c9

Please sign in to comment.