From ed48bcb2a141f206950c6356ce3c2a28ae468709 Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Thu, 12 Sep 2024 10:11:06 +0000 Subject: [PATCH 1/2] Fix reading without consumer from federation --- .../actors/read_init_auth_actor.cpp | 2 +- ydb/services/persqueue_v1/persqueue_ut.cpp | 84 +++++++++++++++++++ 2 files changed, 85 insertions(+), 1 deletion(-) diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp index 91bf31dd8895..1838d811e0ef 100644 --- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp @@ -180,7 +180,7 @@ void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsRespon } // ToDo[migration] - separate option - ? - bool doCheckClientAcl = DoCheckACL && !AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen(); + bool doCheckClientAcl = DoCheckACL && !AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() && !SkipReadRuleCheck; if (doCheckClientAcl) { CheckClientACL(ctx); } else { diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 7ea80350b2a2..0664aa3309cd 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -6997,6 +6997,90 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; driver->Stop(); } + Y_UNIT_TEST(ReadWithoutConsumerFederation) { + const ui32 partititonsCount = 5; + + TPersQueueV1TestServer server; + server.Server->AnnoyingClient->CreateTopic("rt3.dc1--topic2", partititonsCount); + + auto writeSettings = NYdb::NPersQueue::TWriteSessionSettings() + .Path("rt3.dc1--topic2") + .MessageGroupId("src_id"); + + auto writer = server.PersQueueClient->CreateSimpleBlockingWriteSession(writeSettings); + + auto res = writer->Write("some_data"); + UNIT_ASSERT(res); + writer->Close(); + + std::shared_ptr Channel_; + std::unique_ptr StubP_; + + Channel_ = grpc::CreateChannel("localhost:" + ToString(server.Server->GrpcPort), grpc::InsecureChannelCredentials()); + StubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_); + + grpc::ClientContext rcontext; + auto readStream = StubP_->StreamRead(&rcontext); + UNIT_ASSERT(readStream); + + { + Ydb::Topic::StreamReadMessage::FromClient req; + Ydb::Topic::StreamReadMessage::FromServer resp; + + auto topicReadSettings = req.mutable_init_request()->add_topics_read_settings(); + topicReadSettings->set_path("rt3.dc1--topic2"); + for (ui32 i = 0; i < partititonsCount; i++) { + topicReadSettings->add_partition_ids(i); + } + + req.mutable_init_request()->set_consumer(""); + + if (!readStream->Write(req)) { + ythrow yexception() << "write fail"; + } + + UNIT_ASSERT(readStream->Read(&resp)); + UNIT_ASSERT(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kInitResponse); + } + ui32 partitionsSigned = 0; + + while (partitionsSigned != partititonsCount) { + + Ydb::Topic::StreamReadMessage::FromServer resp; + UNIT_ASSERT(readStream->Read(&resp)); + UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest, resp); + auto assignId = resp.start_partition_session_request().partition_session().partition_session_id(); + + Ydb::Topic::StreamReadMessage::FromClient req; + req.mutable_start_partition_session_response()->set_partition_session_id(assignId); + req.mutable_start_partition_session_response()->set_read_offset(0); + auto res = readStream->Write(req); + UNIT_ASSERT(res); + partitionsSigned++; + } + ui32 offset = 0; + ui32 session = 0; + + Ydb::Topic::StreamReadMessage::FromClient req; + req.mutable_read_request()->set_bytes_size(1); + readStream->Write(req); + + Ydb::Topic::StreamReadMessage::FromServer resp; + UNIT_ASSERT(readStream->Read(&resp)); + UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp); + Cerr << "\n" << "Bytes readed: " << resp.read_response().bytes_size() << "\n"; + for (int j = 0; j < resp.read_response().partition_data_size(); j++) { + for (int k = 0; k < resp.read_response().partition_data(j).batches_size(); k++) { + for (int l = 0; l < resp.read_response().partition_data(j).batches(k).message_data_size(); l++) { + offset = resp.read_response().partition_data(j).batches(k).message_data(l).offset(); + session = resp.read_response().partition_data(j).partition_session_id(); + Cerr << "\n" << "Offset: " << offset << " from session " << session << "\n"; + } + } + } + + } + Y_UNIT_TEST(ReadWithoutConsumer) { auto readToEndThenCommit = [] (NPersQueue::TTestServer& server, ui32 partitions, ui32 maxOffset, TString consumer, ui32 readByBytes) { std::shared_ptr Channel_; From 075b0df3ba978e61143186ee518e0ad6c5feab8e Mon Sep 17 00:00:00 2001 From: Nikita Saveliev Date: Thu, 12 Sep 2024 10:17:19 +0000 Subject: [PATCH 2/2] Fix --- ydb/services/persqueue_v1/persqueue_ut.cpp | 32 ++++++++-------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 0664aa3309cd..02b50429c4bb 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -712,7 +712,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp); } - Y_UNIT_TEST(UpdatePartitionLocation) { TPersQueueV1TestServer server; SET_LOCALS; @@ -6631,7 +6630,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; Y_UNIT_TEST(PartitionsMapping) { NPersQueue::TTestServer server; - TString topic = "topic1"; TString topicFullName = "rt3.dc1--" + topic; @@ -6999,13 +6997,18 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; Y_UNIT_TEST(ReadWithoutConsumerFederation) { const ui32 partititonsCount = 5; + const auto topic = "rt3.dc1--topic2"; TPersQueueV1TestServer server; - server.Server->AnnoyingClient->CreateTopic("rt3.dc1--topic2", partititonsCount); + server.Server->AnnoyingClient->CreateTopic(topic, partititonsCount); + + NACLib::TDiffACL acl; + acl.AddAccess(NACLib::EAccessType::Allow, NACLib::GenericFull, "user@" BUILTIN_ACL_DOMAIN); + server.Server->AnnoyingClient->ModifyACL("/Root/PQ", topic, acl.SerializeAsString()); auto writeSettings = NYdb::NPersQueue::TWriteSessionSettings() - .Path("rt3.dc1--topic2") - .MessageGroupId("src_id"); + .Path(topic) + .MessageGroupId("src_id"); auto writer = server.PersQueueClient->CreateSimpleBlockingWriteSession(writeSettings); @@ -7020,15 +7023,15 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; StubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_); grpc::ClientContext rcontext; + rcontext.AddMetadata("x-ydb-auth-ticket", "user@" BUILTIN_ACL_DOMAIN); auto readStream = StubP_->StreamRead(&rcontext); UNIT_ASSERT(readStream); { Ydb::Topic::StreamReadMessage::FromClient req; Ydb::Topic::StreamReadMessage::FromServer resp; - auto topicReadSettings = req.mutable_init_request()->add_topics_read_settings(); - topicReadSettings->set_path("rt3.dc1--topic2"); + topicReadSettings->set_path(topic); for (ui32 i = 0; i < partititonsCount; i++) { topicReadSettings->add_partition_ids(i); } @@ -7058,8 +7061,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; UNIT_ASSERT(res); partitionsSigned++; } - ui32 offset = 0; - ui32 session = 0; Ydb::Topic::StreamReadMessage::FromClient req; req.mutable_read_request()->set_bytes_size(1); @@ -7068,20 +7069,9 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}}; Ydb::Topic::StreamReadMessage::FromServer resp; UNIT_ASSERT(readStream->Read(&resp)); UNIT_ASSERT_C(resp.server_message_case() == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse, resp); - Cerr << "\n" << "Bytes readed: " << resp.read_response().bytes_size() << "\n"; - for (int j = 0; j < resp.read_response().partition_data_size(); j++) { - for (int k = 0; k < resp.read_response().partition_data(j).batches_size(); k++) { - for (int l = 0; l < resp.read_response().partition_data(j).batches(k).message_data_size(); l++) { - offset = resp.read_response().partition_data(j).batches(k).message_data(l).offset(); - session = resp.read_response().partition_data(j).partition_session_id(); - Cerr << "\n" << "Offset: " << offset << " from session " << session << "\n"; - } - } - } - } - Y_UNIT_TEST(ReadWithoutConsumer) { + Y_UNIT_TEST(ReadWithoutConsumerFirstClassCitizen) { auto readToEndThenCommit = [] (NPersQueue::TTestServer& server, ui32 partitions, ui32 maxOffset, TString consumer, ui32 readByBytes) { std::shared_ptr Channel_; std::unique_ptr StubP_;