Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ydb/core/http_proxy/http_req.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,11 +469,11 @@ namespace NKikimr::NHttpProxy {
auto queueUrl = QueueUrlExtractor(Request);
if (!queueUrl.empty()) {
auto cloudIdAndResourceId = NKikimr::NYmq::CloudIdAndResourceIdFromQueueUrl(queueUrl);
if(cloudIdAndResourceId.Empty()) {
if (cloudIdAndResourceId.first.empty()) {
return ReplyWithError(ctx, NYdb::EStatus::BAD_REQUEST, "Invalid queue url");
}
CloudId = cloudIdAndResourceId.Get()->first;
ResourceId = cloudIdAndResourceId.Get()->second;
CloudId = cloudIdAndResourceId.first;
ResourceId = cloudIdAndResourceId.second;
}
} catch (const NKikimr::NSQS::TSQSException& e) {
NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK;
Expand Down
22 changes: 14 additions & 8 deletions ydb/core/http_proxy/ut/datastreams_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ T GetByPath(const NJson::TJsonValue& msg, TStringBuf path) {
}
}


class THttpProxyTestMock : public NUnitTest::TBaseFixture {
friend class THttpProxyTestMockForSQS;
public:
THttpProxyTestMock() = default;
~THttpProxyTestMock() = default;
Expand All @@ -81,12 +81,12 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
InitAll();
}

void InitAll() {
void InitAll(bool yandexCloudMode = true) {
AccessServicePort = PortManager.GetPort(8443);
AccessServiceEndpoint = "127.0.0.1:" + ToString(AccessServicePort);
InitKikimr();
InitKikimr(yandexCloudMode);
InitAccessServiceService();
InitHttpServer();
InitHttpServer(yandexCloudMode);
}

static TString FormAuthorizationStr(const TString& region) {
Expand Down Expand Up @@ -365,7 +365,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
return resultSet;
}

void InitKikimr() {
void InitKikimr(bool yandexCloudMode) {
AuthFactory = std::make_shared<TIamAuthFactory>();
NKikimrConfig::TAppConfig appConfig;
appConfig.MutablePQConfig()->SetTopicsAreFirstClassCitizen(true);
Expand All @@ -376,7 +376,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
appConfig.MutablePQConfig()->MutableBillingMeteringConfig()->SetEnabled(true);

appConfig.MutableSqsConfig()->SetEnableSqs(true);
appConfig.MutableSqsConfig()->SetYandexCloudMode(true);
appConfig.MutableSqsConfig()->SetYandexCloudMode(yandexCloudMode);
appConfig.MutableSqsConfig()->SetEnableDeadLetterQueues(true);

auto limit = appConfig.MutablePQConfig()->AddValidRetentionLimits();
Expand Down Expand Up @@ -639,7 +639,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
AccessServiceServer = builder.BuildAndStart();
}

void InitHttpServer() {
void InitHttpServer(bool yandexCloudMode) {
NKikimrConfig::TServerlessProxyConfig config;
config.MutableHttpConfig()->AddYandexCloudServiceRegion("ru-central1");
config.MutableHttpConfig()->AddYandexCloudServiceRegion("ru-central-1");
Expand All @@ -649,7 +649,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
config.MutableHttpConfig()->SetAccessServiceEndpoint(TStringBuilder() << "127.0.0.1:" << AccessServicePort);
config.SetTestMode(true);
config.MutableHttpConfig()->SetPort(HttpServicePort);
config.MutableHttpConfig()->SetYandexCloudMode(true);
config.MutableHttpConfig()->SetYandexCloudMode(yandexCloudMode);
config.MutableHttpConfig()->SetYmqEnabled(true);

std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory = NYdb::CreateOAuthCredentialsProviderFactory("proxy_sa@builtin");
Expand Down Expand Up @@ -760,3 +760,9 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
ui16 MonPort = 0;
ui16 KikimrGrpcPort = 0;
};

class THttpProxyTestMockForSQS : public THttpProxyTestMock {
void SetUp(NUnitTest::TTestContext&) override {
InitAll(false);
}
};
12 changes: 12 additions & 0 deletions ydb/core/http_proxy/ut/http_proxy_ut.h
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,18 @@ Y_UNIT_TEST_SUITE(TestHttpProxy) {
UNIT_ASSERT(!GetByPath<TString>(json, "MessageId").empty());
}

Y_UNIT_TEST_F(TestSendMessageEmptyQueueUrl, THttpProxyTestMockForSQS) {
NJson::TJsonValue sendMessageReq;
sendMessageReq["QueueUrl"] = "";
auto body = "MessageBody-0";
sendMessageReq["MessageBody"] = body;
sendMessageReq["MessageDeduplicationId"] = "MessageDeduplicationId-0";
sendMessageReq["MessageGroupId"] = "MessageGroupId-0";

auto res = SendHttpRequest("/Root", "AmazonSQS.SendMessage", std::move(sendMessageReq), FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 400);
}

Y_UNIT_TEST_F(TestReceiveMessage, THttpProxyTestMock) {
auto createQueueReq = CreateSqsCreateQueueRequest();
auto res = SendHttpRequest("/Root", "AmazonSQS.CreateQueue", createQueueReq, FormAuthorizationStr("ru-central1"));
Expand Down
25 changes: 25 additions & 0 deletions ydb/services/ymq/utils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#include "utils.h"

#include <util/string/split.h>
#include <ydb/core/ymq/base/utils.h>


namespace NKikimr::NYmq {
std::pair<TString, TString> CloudIdAndResourceIdFromQueueUrl(const TString& queueUrl) {
auto protocolSeparator = queueUrl.find("://");
if (protocolSeparator == TString::npos) {
return {"", ""};
}

auto restOfUrl = queueUrl.substr(protocolSeparator + 3);
auto parts = StringSplitter(restOfUrl).Split('/').ToList<TString>();
if (parts.size() < 3) {
return {"", ""};
}

bool isPrivateRequest = NKikimr::NSQS::IsPrivateRequest(restOfUrl);
TString queueName = NKikimr::NSQS::ExtractQueueNameFromPath(restOfUrl, isPrivateRequest);
TString accountName = NKikimr::NSQS::ExtractAccountNameFromPath(restOfUrl, isPrivateRequest);
return {accountName, queueName};
}
}
21 changes: 1 addition & 20 deletions ydb/services/ymq/utils.h
Original file line number Diff line number Diff line change
@@ -1,26 +1,7 @@
#pragma once

#include <util/generic/string.h>
#include <util/generic/maybe.h>
#include <util/string/split.h>
#include <ydb/core/ymq/base/utils.h>

namespace NKikimr::NYmq {
inline static TMaybe<std::pair<TString, TString>> CloudIdAndResourceIdFromQueueUrl(const TString& queueUrl) {
auto protocolSeparator = queueUrl.find("://");
if (protocolSeparator == TString::npos) {
return Nothing();
}

auto restOfUrl = queueUrl.substr(protocolSeparator + 3);
auto parts = StringSplitter(restOfUrl).Split('/').ToList<TString>();
if (parts.size() < 3) {
return Nothing();
}

bool isPrivateRequest = NKikimr::NSQS::IsPrivateRequest(restOfUrl);
TString queueName = NKikimr::NSQS::ExtractQueueNameFromPath(restOfUrl, isPrivateRequest);
TString accountName = NKikimr::NSQS::ExtractAccountNameFromPath(restOfUrl, isPrivateRequest);
return std::pair<TString, TString>(std::move(accountName), std::move(queueName));
}
std::pair<TString, TString> CloudIdAndResourceIdFromQueueUrl(const TString& queueUrl);
}
1 change: 1 addition & 0 deletions ydb/services/ymq/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ LIBRARY()
SRCS(
ymq_proxy.cpp
grpc_service.cpp
utils.cpp
)

PEERDIR(
Expand Down
24 changes: 12 additions & 12 deletions ydb/services/ymq/ymq_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ namespace NKikimr::NYmq::V1 {
COPY_FIELD_IF_PRESENT(message_deduplication_id, MessageDeduplicationId);
COPY_FIELD_IF_PRESENT(message_group_id, MessageGroupId);

result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second);

result->SetMessageBody(GetProtoRequest()->Getmessage_body());

Expand Down Expand Up @@ -376,7 +376,7 @@ namespace NKikimr::NYmq::V1 {
NKikimr::NSQS::TReceiveMessageRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableReceiveMessage();

result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);

COPY_FIELD_IF_PRESENT(max_number_of_messages, MaxNumberOfMessages);
COPY_FIELD_IF_PRESENT(receive_request_attempt_id, ReceiveRequestAttemptId);
Expand Down Expand Up @@ -496,7 +496,7 @@ namespace NKikimr::NYmq::V1 {
private:
NKikimr::NSQS::TGetQueueAttributesRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableGetQueueAttributes();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
for (const auto& attributeName : GetProtoRequest()->Getattribute_names()) {
result->MutableNames()->Add()->assign(attributeName);
}
Expand Down Expand Up @@ -574,7 +574,7 @@ namespace NKikimr::NYmq::V1 {
private:
NKikimr::NSQS::TDeleteMessageRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableDeleteMessage();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
result->SetReceiptHandle(GetProtoRequest()->receipt_handle());
return result;
}
Expand Down Expand Up @@ -607,7 +607,7 @@ namespace NKikimr::NYmq::V1 {
private:
NKikimr::NSQS::TPurgeQueueRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutablePurgeQueue();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
return result;
}
};
Expand Down Expand Up @@ -639,7 +639,7 @@ namespace NKikimr::NYmq::V1 {
private:
NKikimr::NSQS::TDeleteQueueRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableDeleteQueue();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
return result;
}
};
Expand Down Expand Up @@ -671,7 +671,7 @@ namespace NKikimr::NYmq::V1 {
private:
NKikimr::NSQS::TChangeMessageVisibilityRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableChangeMessageVisibility();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
result->SetReceiptHandle(GetProtoRequest()->Getreceipt_handle());
result->SetVisibilityTimeout(GetProtoRequest()->Getvisibility_timeout());
return result;
Expand Down Expand Up @@ -713,7 +713,7 @@ namespace NKikimr::NYmq::V1 {
private:
NKikimr::NSQS::TSetQueueAttributesRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableSetQueueAttributes();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
for (auto& [name, value]: GetProtoRequest()->Getattributes()) {
AddAttribute(requestHolder, name, value);
}
Expand Down Expand Up @@ -751,7 +751,7 @@ namespace NKikimr::NYmq::V1 {
private:
NKikimr::NSQS::TListDeadLetterSourceQueuesRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableListDeadLetterSourceQueues();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url())->second);
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->queue_url()).second);
return result;
}
};
Expand Down Expand Up @@ -803,7 +803,7 @@ namespace NKikimr::NYmq::V1 {
NKikimr::NSQS::TSendMessageBatchRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableSendMessageBatch();

result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second);

for (auto& requestEntry : GetProtoRequest()->Getentries()) {
auto entry = requestHolder.MutableSendMessageBatch()->MutableEntries()->Add();
Expand Down Expand Up @@ -870,7 +870,7 @@ namespace NKikimr::NYmq::V1 {
private:
NKikimr::NSQS::TDeleteMessageBatchRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableDeleteMessageBatch();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second);
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
auto entry = requestHolder.MutableDeleteMessageBatch()->AddEntries();
entry->SetId(requestEntry.Getid());
Expand Down Expand Up @@ -921,7 +921,7 @@ namespace NKikimr::NYmq::V1 {
private:
NKikimr::NSQS::TChangeMessageVisibilityBatchRequest* GetRequest(TSqsRequest& requestHolder) override {
auto result = requestHolder.MutableChangeMessageVisibilityBatch();
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url())->second);
result->SetQueueName(CloudIdAndResourceIdFromQueueUrl(GetProtoRequest()->Getqueue_url()).second);
for (auto& requestEntry : GetProtoRequest()->Getentries()) {
auto entry = requestHolder.MutableChangeMessageVisibilityBatch()->MutableEntries()->Add();
entry->SetId(requestEntry.Getid());
Expand Down
Loading