diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index 34236580c271..19ef8330767e 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -469,11 +469,11 @@ namespace NKikimr::NHttpProxy { auto queueUrl = QueueUrlExtractor(Request); if (!queueUrl.empty()) { auto cloudIdAndResourceId = NKikimr::NYmq::CloudIdAndResourceIdFromQueueUrl(queueUrl); - if(cloudIdAndResourceId.Empty()) { + if (cloudIdAndResourceId.first.empty()) { return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, "Invalid queue url"); } - CloudId = cloudIdAndResourceId.Get()->first; - ResourceId = cloudIdAndResourceId.Get()->second; + CloudId = cloudIdAndResourceId.first; + ResourceId = cloudIdAndResourceId.second; } } catch (const NKikimr::NSQS::TSQSException& e) { NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK; diff --git a/ydb/core/http_proxy/ut/datastreams_fixture.h b/ydb/core/http_proxy/ut/datastreams_fixture.h index de3c4a4eb3fd..741ad480a9e6 100644 --- a/ydb/core/http_proxy/ut/datastreams_fixture.h +++ b/ydb/core/http_proxy/ut/datastreams_fixture.h @@ -67,8 +67,8 @@ T GetByPath(const NJson::TJsonValue& msg, TStringBuf path) { } } - class THttpProxyTestMock : public NUnitTest::TBaseFixture { + friend class THttpProxyTestMockForSQS; public: THttpProxyTestMock() = default; ~THttpProxyTestMock() = default; @@ -81,12 +81,12 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { InitAll(); } - void InitAll() { + void InitAll(bool yandexCloudMode = true) { AccessServicePort = PortManager.GetPort(8443); AccessServiceEndpoint = "127.0.0.1:" + ToString(AccessServicePort); - InitKikimr(); + InitKikimr(yandexCloudMode); InitAccessServiceService(); - InitHttpServer(); + InitHttpServer(yandexCloudMode); } static TString FormAuthorizationStr(const TString& region) { @@ -365,7 +365,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { return resultSet; } - void InitKikimr() { + void InitKikimr(bool yandexCloudMode) { AuthFactory = std::make_shared(); NKikimrConfig::TAppConfig appConfig; appConfig.MutablePQConfig()->SetTopicsAreFirstClassCitizen(true); @@ -376,7 +376,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { appConfig.MutablePQConfig()->MutableBillingMeteringConfig()->SetEnabled(true); appConfig.MutableSqsConfig()->SetEnableSqs(true); - appConfig.MutableSqsConfig()->SetYandexCloudMode(true); + appConfig.MutableSqsConfig()->SetYandexCloudMode(yandexCloudMode); appConfig.MutableSqsConfig()->SetEnableDeadLetterQueues(true); auto limit = appConfig.MutablePQConfig()->AddValidRetentionLimits(); @@ -639,7 +639,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { AccessServiceServer = builder.BuildAndStart(); } - void InitHttpServer() { + void InitHttpServer(bool yandexCloudMode) { NKikimrConfig::TServerlessProxyConfig config; config.MutableHttpConfig()->AddYandexCloudServiceRegion("ru-central1"); config.MutableHttpConfig()->AddYandexCloudServiceRegion("ru-central-1"); @@ -649,7 +649,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { config.MutableHttpConfig()->SetAccessServiceEndpoint(TStringBuilder() << "127.0.0.1:" << AccessServicePort); config.SetTestMode(true); config.MutableHttpConfig()->SetPort(HttpServicePort); - config.MutableHttpConfig()->SetYandexCloudMode(true); + config.MutableHttpConfig()->SetYandexCloudMode(yandexCloudMode); config.MutableHttpConfig()->SetYmqEnabled(true); std::shared_ptr credentialsProviderFactory = NYdb::CreateOAuthCredentialsProviderFactory("proxy_sa@builtin"); @@ -760,3 +760,9 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture { ui16 MonPort = 0; ui16 KikimrGrpcPort = 0; }; + +class THttpProxyTestMockForSQS : public THttpProxyTestMock { + void SetUp(NUnitTest::TTestContext&) override { + InitAll(false); + } +}; diff --git a/ydb/core/http_proxy/ut/http_proxy_ut.h b/ydb/core/http_proxy/ut/http_proxy_ut.h index 39ea72747ccc..abd778e1cd48 100644 --- a/ydb/core/http_proxy/ut/http_proxy_ut.h +++ b/ydb/core/http_proxy/ut/http_proxy_ut.h @@ -1663,6 +1663,18 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) { UNIT_ASSERT(!GetByPath(json, "MessageId").empty()); } + Y_UNIT_TEST_F(TestSendMessageEmptyQueueUrl, THttpProxyTestMockForSQS) { + NJson::TJsonValue sendMessageReq; + sendMessageReq["QueueUrl"] = ""; + auto body = "MessageBody-0"; + sendMessageReq["MessageBody"] = body; + sendMessageReq["MessageDeduplicationId"] = "MessageDeduplicationId-0"; + sendMessageReq["MessageGroupId"] = "MessageGroupId-0"; + + auto res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", std::move(sendMessageReq), FormAuthorizationStr("ru-central1")); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 400); + } + Y_UNIT_TEST_F(TestReceiveMessage, THttpProxyTestMock) { auto createQueueReq = CreateSqsCreateQueueRequest(); auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", createQueueReq, FormAuthorizationStr("ru-central1")); diff --git a/ydb/services/ymq/utils.cpp b/ydb/services/ymq/utils.cpp new file mode 100644 index 000000000000..085f3c1db13d --- /dev/null +++ b/ydb/services/ymq/utils.cpp @@ -0,0 +1,25 @@ +#include "utils.h" + +#include +#include + + +namespace NKikimr::NYmq { + std::pair CloudIdAndResourceIdFromQueueUrl(const TString& queueUrl) { + auto protocolSeparator = queueUrl.find("://"); + if (protocolSeparator == TString::npos) { + return {"", ""}; + } + + auto restOfUrl = queueUrl.substr(protocolSeparator + 3); + auto parts = StringSplitter(restOfUrl).Split('/').ToList(); + if (parts.size() < 3) { + return {"", ""}; + } + + bool isPrivateRequest = NKikimr::NSQS::IsPrivateRequest(restOfUrl); + TString queueName = NKikimr::NSQS::ExtractQueueNameFromPath(restOfUrl, isPrivateRequest); + TString accountName = NKikimr::NSQS::ExtractAccountNameFromPath(restOfUrl, isPrivateRequest); + return {accountName, queueName}; + } +} diff --git a/ydb/services/ymq/utils.h b/ydb/services/ymq/utils.h index d5cd58f8a3bf..f643b80e80fc 100644 --- a/ydb/services/ymq/utils.h +++ b/ydb/services/ymq/utils.h @@ -1,26 +1,7 @@ #pragma once #include -#include -#include -#include namespace NKikimr::NYmq { - inline static TMaybe> CloudIdAndResourceIdFromQueueUrl(const TString& queueUrl) { - auto protocolSeparator = queueUrl.find("://"); - if (protocolSeparator == TString::npos) { - return Nothing(); - } - - auto restOfUrl = queueUrl.substr(protocolSeparator + 3); - auto parts = StringSplitter(restOfUrl).Split('/').ToList(); - if (parts.size() < 3) { - return Nothing(); - } - - bool isPrivateRequest = NKikimr::NSQS::IsPrivateRequest(restOfUrl); - TString queueName = NKikimr::NSQS::ExtractQueueNameFromPath(restOfUrl, isPrivateRequest); - TString accountName = NKikimr::NSQS::ExtractAccountNameFromPath(restOfUrl, isPrivateRequest); - return std::pair(std::move(accountName), std::move(queueName)); - } + std::pair CloudIdAndResourceIdFromQueueUrl(const TString& queueUrl); } diff --git a/ydb/services/ymq/ya.make b/ydb/services/ymq/ya.make index 4a1bcf90b231..4d9c71fd367b 100644 --- a/ydb/services/ymq/ya.make +++ b/ydb/services/ymq/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( ymq_proxy.cpp grpc_service.cpp + utils.cpp ) PEERDIR( diff --git a/ydb/services/ymq/ymq_proxy.cpp b/ydb/services/ymq/ymq_proxy.cpp index 6a60db3179f8..40ace507a49d 100644 --- a/ydb/services/ymq/ymq_proxy.cpp +++ b/ydb/services/ymq/ymq_proxy.cpp @@ -290,7 +290,7 @@ namespace NKikimr::NYmq::V1 { COPY_FIELD_IF_PRESENT(message_deduplication_id, MessageDeduplicationId); COPY_FIELD_IF_PRESENT(message_group_id, MessageGroupId); - result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second); result->SetMessageBody(GetProtoRequest()->Getmessage_body()); @@ -376,7 +376,7 @@ namespace NKikimr::NYmq::V1 { NKikimr::NSQS::TReceiveMessageRequest* GetRequest(TSqsRequest& requestHolder) override { auto result = requestHolder.MutableReceiveMessage(); - result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second); COPY_FIELD_IF_PRESENT(max_number_of_messages, MaxNumberOfMessages); COPY_FIELD_IF_PRESENT(receive_request_attempt_id, ReceiveRequestAttemptId); @@ -496,7 +496,7 @@ namespace NKikimr::NYmq::V1 { private: NKikimr::NSQS::TGetQueueAttributesRequest* GetRequest(TSqsRequest& requestHolder) override { auto result = requestHolder.MutableGetQueueAttributes(); - result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second); for (const auto& attributeName : GetProtoRequest()->Getattribute_names()) { result->MutableNames()->Add()->assign(attributeName); } @@ -574,7 +574,7 @@ namespace NKikimr::NYmq::V1 { private: NKikimr::NSQS::TDeleteMessageRequest* GetRequest(TSqsRequest& requestHolder) override { auto result = requestHolder.MutableDeleteMessage(); - result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second); result->SetReceiptHandle(GetProtoRequest()->receipt_handle()); return result; } @@ -607,7 +607,7 @@ namespace NKikimr::NYmq::V1 { private: NKikimr::NSQS::TPurgeQueueRequest* GetRequest(TSqsRequest& requestHolder) override { auto result = requestHolder.MutablePurgeQueue(); - result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second); return result; } }; @@ -639,7 +639,7 @@ namespace NKikimr::NYmq::V1 { private: NKikimr::NSQS::TDeleteQueueRequest* GetRequest(TSqsRequest& requestHolder) override { auto result = requestHolder.MutableDeleteQueue(); - result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second); return result; } }; @@ -671,7 +671,7 @@ namespace NKikimr::NYmq::V1 { private: NKikimr::NSQS::TChangeMessageVisibilityRequest* GetRequest(TSqsRequest& requestHolder) override { auto result = requestHolder.MutableChangeMessageVisibility(); - result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second); result->SetReceiptHandle(GetProtoRequest()->Getreceipt_handle()); result->SetVisibilityTimeout(GetProtoRequest()->Getvisibility_timeout()); return result; @@ -713,7 +713,7 @@ namespace NKikimr::NYmq::V1 { private: NKikimr::NSQS::TSetQueueAttributesRequest* GetRequest(TSqsRequest& requestHolder) override { auto result = requestHolder.MutableSetQueueAttributes(); - result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second); for (auto& [name, value]: GetProtoRequest()->Getattributes()) { AddAttribute(requestHolder, name, value); } @@ -751,7 +751,7 @@ namespace NKikimr::NYmq::V1 { private: NKikimr::NSQS::TListDeadLetterSourceQueuesRequest* GetRequest(TSqsRequest& requestHolder) override { auto result = requestHolder.MutableListDeadLetterSourceQueues(); - result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second); return result; } }; @@ -803,7 +803,7 @@ namespace NKikimr::NYmq::V1 { NKikimr::NSQS::TSendMessageBatchRequest* GetRequest(TSqsRequest& requestHolder) override { auto result = requestHolder.MutableSendMessageBatch(); - result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second); for (auto& requestEntry : GetProtoRequest()->Getentries()) { auto entry = requestHolder.MutableSendMessageBatch()->MutableEntries()->Add(); @@ -870,7 +870,7 @@ namespace NKikimr::NYmq::V1 { private: NKikimr::NSQS::TDeleteMessageBatchRequest* GetRequest(TSqsRequest& requestHolder) override { auto result = requestHolder.MutableDeleteMessageBatch(); - result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second); for (auto& requestEntry : GetProtoRequest()->Getentries()) { auto entry = requestHolder.MutableDeleteMessageBatch()->AddEntries(); entry->SetId(requestEntry.Getid()); @@ -921,7 +921,7 @@ namespace NKikimr::NYmq::V1 { private: NKikimr::NSQS::TChangeMessageVisibilityBatchRequest* GetRequest(TSqsRequest& requestHolder) override { auto result = requestHolder.MutableChangeMessageVisibilityBatch(); - result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second); + result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second); for (auto& requestEntry : GetProtoRequest()->Getentries()) { auto entry = requestHolder.MutableChangeMessageVisibilityBatch()->MutableEntries()->Add(); entry->SetId(requestEntry.Getid());