Skip to content

Commit

Permalink
Fix kafka cdc read and login without @ (#10678)
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored Oct 22, 2024
1 parent 12559bf commit e96a66e
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 24 deletions.
5 changes: 3 additions & 2 deletions ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@ struct PartitionOffsets {
class TTopicOffsetActor: public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<TTopicOffsetActor,
NKikimr::NGRpcProxy::V1::TLocalRequestBase,
NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvPartitionLocationResponse>,
public NKikimr::NGRpcProxy::V1::TDescribeTopicActorImpl {
public NKikimr::NGRpcProxy::V1::TDescribeTopicActorImpl,
public NKikimr::NGRpcProxy::V1::TCdcStreamCompatible {
using TBase = NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<TTopicOffsetActor,
NKikimr::NGRpcProxy::V1::TLocalRequestBase,
NKikimr::NGRpcProxy::V1::TEvPQProxy::TEvPartitionLocationResponse>;

public:
TTopicOffsetActor(std::shared_ptr<TSet<TString>> consumers,
const NKikimr::NGRpcProxy::V1::TLocalRequestBase& request,
const TActorId& requester,
const TActorId& requester,
std::shared_ptr<TSet<ui32>> partitions,
const TString& originalTopicName,
const TString& userSID)
Expand Down
9 changes: 5 additions & 4 deletions ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,13 @@ bool TKafkaSaslAuthActor::TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& aut
auto password = tokens[2];
size_t atPos = userAndDatabase.rfind('@');
if (atPos == TString::npos) {
SendResponseAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "Database not provided.", "", ctx);
return false;
authData.UserName = "";
authData.Database = userAndDatabase;
} else {
authData.UserName = userAndDatabase.substr(0, atPos);
authData.Database = userAndDatabase.substr(atPos + 1);
}

authData.UserName = userAndDatabase.substr(0, atPos);
authData.Database = userAndDatabase.substr(atPos + 1);
authData.Password = password;
return true;
}
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/kafka_proxy/actors/kafka_topic_offsets_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ namespace NKafka {
class TTopicOffsetsActor : public NKikimr::NGRpcProxy::V1::TPQInternalSchemaActor<TTopicOffsetsActor,
TEvKafka::TGetOffsetsRequest,
TEvKafka::TEvTopicOffsetsResponse>
, public NKikimr::NGRpcProxy::V1::TDescribeTopicActorImpl {
, public NKikimr::NGRpcProxy::V1::TDescribeTopicActorImpl
, public NKikimr::NGRpcProxy::V1::TCdcStreamCompatible {

using TBase = TPQInternalSchemaActor<TTopicOffsetsActor,
TEvKafka::TGetOffsetsRequest,
Expand All @@ -30,17 +31,18 @@ using TBase = TPQInternalSchemaActor<TTopicOffsetsActor,
void StateWork(TAutoPtr<IEventHandle>& ev);

void HandleCacheNavigateResponse(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override;

virtual void ApplyResponse(TTabletInfo&, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr&,
const TActorContext&) override {
Y_ABORT();
}

bool ApplyResponse(NKikimr::TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr&, const TActorContext&) override {
Y_ABORT();
}

void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) override;

void Reply(const TActorContext&) override;

void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext&) override;
Expand Down
108 changes: 108 additions & 0 deletions ydb/core/kafka_proxy/ut/ut_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,11 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
TString topicName = "/Root/topic-0-test";
TString shortTopicName = "topic-0-test";
TString notExistsTopicName = "/Root/not-exists";

TString tableName = "/Root/table-0-test";
TString feedName = "feed";
TString feedPath = tableName + "/" + feedName;

ui64 minActivePartitions = 10;

TString key = "record-key";
Expand Down Expand Up @@ -1207,6 +1212,60 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
}
}

{
NYdb::NTable::TTableClient tableClient(*testServer.Driver);
tableClient.RetryOperationSync([&](TSession session)
{
NYdb::NTable::TTableBuilder builder;
builder.AddNonNullableColumn("key", NYdb::EPrimitiveType::Int64).SetPrimaryKeyColumn("key");
builder.AddNonNullableColumn("value", NYdb::EPrimitiveType::Int64);

auto createResult = session.CreateTable(tableName, builder.Build()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(createResult.IsTransportError(), false);
Cerr << createResult.GetIssues().ToString() << "\n";
UNIT_ASSERT_VALUES_EQUAL(createResult.GetStatus(), EStatus::SUCCESS);

auto alterResult = session.AlterTable(tableName, NYdb::NTable::TAlterTableSettings()
.AppendAddChangefeeds(NYdb::NTable::TChangefeedDescription(feedName,
NYdb::NTable::EChangefeedMode::Updates,
NYdb::NTable::EChangefeedFormat::Json))
).ExtractValueSync();
Cerr << alterResult.GetIssues().ToString() << "\n";
UNIT_ASSERT_VALUES_EQUAL(alterResult.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL(alterResult.GetStatus(), EStatus::SUCCESS);
return alterResult;
}
);

TValueBuilder rows;
rows.BeginList();
rows.AddListItem()
.BeginStruct()
.AddMember("key").Int64(1)
.AddMember("value").Int64(2)
.EndStruct();
rows.EndList();

auto upsertResult = tableClient.BulkUpsert(tableName, rows.Build()).GetValueSync();
UNIT_ASSERT_EQUAL(upsertResult.GetStatus(), EStatus::SUCCESS);
}

{
// Check CDC
std::vector<std::pair<TString, std::vector<i32>>> topics {{feedPath, {0}}};
auto msg = client.Fetch(topics);
UNIT_ASSERT_VALUES_EQUAL(msg->Responses.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].Records.has_value(), true);
UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Partitions[0].Records->Records.size(), 1);
auto record = msg->Responses[0].Partitions[0].Records->Records[0];

auto data = record.Value.value();
auto dataStr = TString(data.data(), data.size());
UNIT_ASSERT_VALUES_EQUAL(dataStr, "{\"update\":{\"value\":2},\"key\":[1]}");
}

} // Y_UNIT_TEST(FetchScenario)

Y_UNIT_TEST(BalanceScenario) {
Expand Down Expand Up @@ -2300,4 +2359,53 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {

Sleep(TDuration::Seconds(1));
}

Y_UNIT_TEST(LoginWithApiKeyWithoutAt) {
TInsecureTestServer testServer;

TString topicName = "/Root/topic-0-test";

NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
{
auto result =
pqClient
.CreateTopic(topicName,
NYdb::NTopic::TCreateTopicSettings()
.PartitioningSettings(10, 100)
.BeginAddConsumer("consumer-0").EndAddConsumer())
.ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
}

auto settings = NTopic::TReadSessionSettings()
.AppendTopics(NTopic::TTopicReadSettings(topicName))
.ConsumerName("consumer-0");
auto topicReader = pqClient.CreateReadSession(settings);

TTestClient client(testServer.Port);

{
auto msg = client.ApiVersions();

UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 18u);
}

{
auto msg = client.SaslHandshake();

UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
}

{
auto msg = client.SaslAuthenticate("/Root", "ApiKey-value-valid");
Cerr << msg->ErrorMessage << "\n";
UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
}

Sleep(TDuration::Seconds(1));
}
} // Y_UNIT_TEST_SUITE(KafkaProtocol)
59 changes: 50 additions & 9 deletions ydb/core/persqueue/fetch_request_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ struct TEvPrivate {
TActorId RequesterId;
ui64 PendingQuotaAmount;

std::unordered_map<TString, TString> PrivateTopicPathToCdcPath;
std::unordered_map<TString, TString> CdcPathToPrivateTopicPath;

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::PQ_FETCH_REQUEST;
Expand Down Expand Up @@ -176,16 +179,24 @@ struct TEvPrivate {
schemeCacheRequest->DatabaseName = Settings.Database;

THashSet<TString> topicsRequested;
for (const auto& part : Settings.Partitions) {
auto ins = topicsRequested.insert(part.Topic).second;
if (!ins)
continue;
auto split = NKikimr::SplitPath(part.Topic);

if (PrivateTopicPathToCdcPath.empty()) {
for (const auto& part : Settings.Partitions) {
topicsRequested.insert(part.Topic);
}
} else {
for (const auto& [key, value] : PrivateTopicPathToCdcPath) {
topicsRequested.insert(key);
}
}

for (const auto& topicName : topicsRequested) {
auto split = NKikimr::SplitPath(topicName);
TSchemeCacheNavigate::TEntry entry;
entry.Path.insert(entry.Path.end(), split.begin(), split.end());

entry.SyncVersion = true;
entry.ShowPrivatePath = false;
entry.ShowPrivatePath = true;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;

schemeCacheRequest->ResultSet.emplace_back(std::move(entry));
Expand All @@ -197,6 +208,7 @@ struct TEvPrivate {
void HandleSchemeCacheResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_FETCH_REQUEST, "Handle SchemeCache response");
auto& result = ev->Get()->Request;
bool anyCdcTopicInRequest = false;
for (const auto& entry : result->ResultSet) {
auto path = CanonizePath(NKikimr::JoinPath(entry.Path));
switch (entry.Status) {
Expand All @@ -219,6 +231,16 @@ struct TEvPrivate {
), ctx
);
}
if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindCdcStream) {
anyCdcTopicInRequest = true;
Y_ABORT_UNLESS(entry.ListNodeEntry->Children.size() == 1);
auto privateTopicPath = CanonizePath(JoinPath(ChildPath(NKikimr::SplitPath(path), entry.ListNodeEntry->Children.at(0).Name)));
PrivateTopicPathToCdcPath[privateTopicPath] = path;
CdcPathToPrivateTopicPath[path] = privateTopicPath;
TopicInfo[privateTopicPath] = TopicInfo[path];
TopicInfo.erase(path);
continue;
}
if (entry.Kind != TSchemeCacheNavigate::EKind::KindTopic) {
return SendReplyAndDie(
CreateErrorReply(
Expand Down Expand Up @@ -256,6 +278,12 @@ struct TEvPrivate {
topicInfo.BalancerTabletId = description.GetBalancerTabletID();
topicInfo.PQInfo = entry.PQGroupInfo;
}

if (anyCdcTopicInRequest) {
SendSchemeCacheRequest(ctx);
return;
}

for (auto& p: TopicInfo) {
ProcessMetadata(p.first, p.second, ctx);
}
Expand Down Expand Up @@ -393,8 +421,15 @@ struct TEvPrivate {
return SendReplyAndDie(std::move(Response), ctx);
}
Y_ABORT_UNLESS(FetchRequestReadsDone < Settings.Partitions.size());
const auto& req = Settings.Partitions[FetchRequestReadsDone];
const auto& topic = req.Topic;
auto& req = Settings.Partitions[FetchRequestReadsDone];

auto& topic = req.Topic;

auto cdcToPrivateIt = CdcPathToPrivateTopicPath.find(req.Topic);
if (cdcToPrivateIt != CdcPathToPrivateTopicPath.end()) {
topic = cdcToPrivateIt->second;
}

const auto& offset = req.Offset;
const auto& part = req.Partition;
const auto& maxBytes = req.MaxBytes;
Expand Down Expand Up @@ -462,7 +497,13 @@ struct TEvPrivate {
const auto& topic = req.Topic;
const auto& part = req.Partition;

res->SetTopic(topic);
auto privateTopicToCdcIt = PrivateTopicPathToCdcPath.find(topic);
if (privateTopicToCdcIt == PrivateTopicPathToCdcPath.end()) {
res->SetTopic(topic);
} else {
res->SetTopic(PrivateTopicPathToCdcPath[topic]);
}

res->SetPartition(part);
auto read = res->MutableReadResult();
if (record.HasPartitionResponse() && record.GetPartitionResponse().HasCmdReadResult())
Expand Down
4 changes: 4 additions & 0 deletions ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ namespace {
Cout << Endl << "MeteringMode: " << (TStringBuilder() << topicDescription.GetMeteringMode());
if (!topicDescription.GetSupportedCodecs().empty()) {
Cout << Endl << "SupportedCodecs: " << FormatCodecs(topicDescription.GetSupportedCodecs()) << Endl;
} else {
Cout << Endl;
}
}

Expand All @@ -352,6 +354,8 @@ namespace {
Cout << Endl << "DownUtilizationPercent: " << topicDescription.GetPartitioningSettings().GetAutoPartitioningSettings().GetDownUtilizationPercent();
Cout << Endl << "UpUtilizationPercent: " << topicDescription.GetPartitioningSettings().GetAutoPartitioningSettings().GetUpUtilizationPercent();
Cout << Endl << "StabilizationWindowSeconds: " << topicDescription.GetPartitioningSettings().GetAutoPartitioningSettings().GetStabilizationWindow().Seconds() << Endl;
} else {
Cout << Endl;
}
}
}
Expand Down
26 changes: 21 additions & 5 deletions ydb/services/lib/actors/pq_schema_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ namespace NKikimr::NGRpcProxy::V1 {
if (ProcessCdc(response)) {
return;
}

AddIssue(
FillIssue(
TStringBuilder() << "path '" << path << "' is not compatible scheme object",
Expand Down Expand Up @@ -294,6 +293,7 @@ namespace NKikimr::NGRpcProxy::V1 {
const TMaybe<TString>& GetCdcStreamName() const {
return CdcStreamName;
}

void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
return static_cast<TDerived*>(this)->HandleCacheNavigateResponse(ev);
}
Expand Down Expand Up @@ -541,11 +541,15 @@ namespace NKikimr::NGRpcProxy::V1 {
virtual void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) = 0;

TString GetTopicPath() const override {
return TBase::TopicPath;
auto path = TBase::TopicPath;
if (PrivateTopicName) {
path = JoinPath(ChildPath(NKikimr::SplitPath(path), *PrivateTopicName));
}
return path;
}

void SendDescribeProposeRequest() {
return TBase::SendDescribeProposeRequest(this->ActorContext(), false);
void SendDescribeProposeRequest(bool showPrivate = false) {
return TBase::SendDescribeProposeRequest(this->ActorContext(), showPrivate);
}

bool HandleCacheNavigateResponseBase(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
Expand Down Expand Up @@ -594,16 +598,28 @@ namespace NKikimr::NGRpcProxy::V1 {
return false;
}

bool ProcessCdc(const NSchemeCache::TSchemeCacheNavigate::TEntry& response) override {
if constexpr (THasCdcStreamCompatibility<TDerived>::Value) {
if (static_cast<TDerived*>(this)->IsCdcStreamCompatible()) {
Y_ABORT_UNLESS(response.ListNodeEntry->Children.size() == 1);
PrivateTopicName = response.ListNodeEntry->Children.at(0).Name;
SendDescribeProposeRequest(true);
return true;
}
}
return false;
}


private:
TRequest Request;
TActorId Requester;
TMaybe<TString> PrivateTopicName;

protected:
THolder<TEvResponse> Response;
TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo;
TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TDirEntryInfo> Self;
TMaybe<TString> PrivateTopicName;
};

}
3 changes: 2 additions & 1 deletion ydb/services/persqueue_v1/actors/schema_actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,8 @@ class TAlterTopicActorInternal : public TPQInternalSchemaActor<TAlterTopicActorI
class TPartitionsLocationActor : public TPQInternalSchemaActor<TPartitionsLocationActor,
TGetPartitionsLocationRequest,
TEvPQProxy::TEvPartitionLocationResponse>
, public TDescribeTopicActorImpl {
, public TDescribeTopicActorImpl
, public TCdcStreamCompatible {

using TBase = TPQInternalSchemaActor<TPartitionsLocationActor, TGetPartitionsLocationRequest,
TEvPQProxy::TEvPartitionLocationResponse>;
Expand Down

0 comments on commit e96a66e

Please sign in to comment.