Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions ydb/core/ymq/actor/action.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,15 @@ class TActionActor
DoBootstrap();
}

void Bootstrap(const NActors::TActorContext&) {
void Bootstrap(const NActors::TActorContext&) {
#define SQS_REQUEST_CASE(action) \
const auto& request = SourceSqsRequest_.Y_CAT(Get, action)(); \
auto response = Response_.Y_CAT(Mutable, action)(); \
FillAuthInformation(request); \
response->SetRequestId(RequestId_);

SQS_SWITCH_REQUEST_CUSTOM(SourceSqsRequest_, ENUMERATE_ALL_ACTIONS, Y_ABORT_UNLESS(false));
#undef SQS_REQUEST_CASE
#undef SQS_REQUEST_CASE

RLOG_SQS_DEBUG("Request started. Actor: " << this->SelfId()); // log new request id
StartTs_ = TActivationContext::Now();
Expand All @@ -130,6 +130,7 @@ class TActionActor

// Set timeout
if (cfg.GetRequestTimeoutMs()) {
TimeoutCookie_.Reset(ISchedulerCookie::Make2Way());
this->Schedule(TDuration::MilliSeconds(cfg.GetRequestTimeoutMs()), new TEvWakeup(REQUEST_TIMEOUT_WAKEUP_TAG), TimeoutCookie_.Get());
}

Expand Down Expand Up @@ -349,7 +350,7 @@ class TActionActor
RESPONSE_BATCH_CASE(SendMessageBatch)
RESPONSE_CASE(SetQueueAttributes)
RESPONSE_CASE(ListDeadLetterSourceQueues)
RESPONSE_CASE(CountQueues)
RESPONSE_CASE(CountQueues)
case NKikimrClient::TSqsResponse::kDeleteQueueBatch:
case NKikimrClient::TSqsResponse::kGetQueueAttributesBatch:
case NKikimrClient::TSqsResponse::kPurgeQueueBatch:
Expand Down Expand Up @@ -382,7 +383,7 @@ class TActionActor
);
}

protected:
protected:
template <class TResponse>
void AuditLogEntry(const TResponse& response, const TString& requestId, const TError* error = nullptr) {
if (!error && response.HasError()) {
Expand Down Expand Up @@ -555,7 +556,7 @@ class TActionActor
UserName_ = request.GetAuth().GetUserName();
FolderId_ = request.GetAuth().GetFolderId();
UserSID_ = request.GetAuth().GetUserSID();

if (IsCloud() && !FolderId_) {
auto items = ParseCloudSecurityToken(SecurityToken_);
UserName_ = std::get<0>(items);
Expand Down Expand Up @@ -885,7 +886,7 @@ class TActionActor
TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions> QuoterResources_;
bool NeedReportSqsActionInflyCounter = false;
bool NeedReportYmqActionInflyCounter = false;
TSchedulerCookieHolder TimeoutCookie_ = ISchedulerCookie::Make2Way();
TSchedulerCookieHolder TimeoutCookie_;
NKikimrClient::TSqsRequest SourceSqsRequest_;
};

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/ymq/actor/proxy_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ TString SecurityPrint(const NKikimrClient::TSqsResponse& resp) {
case NKikimrClient::TSqsResponse::kReceiveMessage: {
NKikimrClient::TSqsResponse respCopy = resp;
for (auto& msg : *respCopy.MutableReceiveMessage()->MutableMessages()) {
msg.SetData(TStringBuilder() << "[...user_data_" << msg.GetData().size() << "bytes" << "...]");
msg.SetData(TStringBuilder() << "[...user_data_" << msg.GetData().size() << "bytes" << "...]");
}
return TStringBuilder() << respCopy;
}
Expand Down Expand Up @@ -82,6 +82,7 @@ void TProxyActor::Bootstrap() {

const auto& cfg = Cfg();
if (cfg.GetRequestTimeoutMs()) {
TimeoutCookie_.Reset(ISchedulerCookie::Make2Way());
this->Schedule(TDuration::MilliSeconds(cfg.GetRequestTimeoutMs()), new TEvWakeup(), TimeoutCookie_.Get());
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/ymq/actor/proxy_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class TProxyActor
THolder<IReplyCallback> Cb_;
bool ErrorResponse_ = false;
TInstant StartTs_;
TSchedulerCookieHolder TimeoutCookie_ = ISchedulerCookie::Make2Way();
TSchedulerCookieHolder TimeoutCookie_;

TIntrusivePtr<TUserCounters> UserCounters_;
TIntrusivePtr<TQueueCounters> QueueCounters_;
Expand Down
Loading