Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/core/fq/libs/actors/logging/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
#define LOG_STREAMS_STORAGE_SERVICE_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_STORAGE_SERVICE, logRecordStream)

#define LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, DEBUG, STREAMS_STORAGE_SERVICE, logRecordStream)
#define LOG_STREAMS_STORAGE_SERVICE_AS_INFO(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, INFO, STREAMS_STORAGE_SERVICE, logRecordStream)
#define LOG_STREAMS_STORAGE_SERVICE_AS_WARN(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, WARN, STREAMS_STORAGE_SERVICE, logRecordStream)
#define LOG_STREAMS_STORAGE_SERVICE_AS_ERROR(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, ERROR, STREAMS_STORAGE_SERVICE, logRecordStream)

// Component: STREAMS_SCHEDULER_SERVICE.
#define LOG_STREAMS_SCHEDULER_SERVICE_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_SCHEDULER_SERVICE, logRecordStream)
Expand Down
54 changes: 25 additions & 29 deletions ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@
#include <util/string/join.h>
#include <util/string/strip.h>

#define LOG_STORAGE_ASYNC_DEBUG(actorContext, stream) LOG_LOG_S(actorContext, ::NActors::NLog::PRI_DEBUG, ::NKikimrServices::STREAMS_STORAGE_SERVICE, stream);
#define LOG_STORAGE_ASYNC_INFO(actorContext, stream) LOG_LOG_S(actorContext, ::NActors::NLog::PRI_INFO, ::NKikimrServices::STREAMS_STORAGE_SERVICE, stream);
#define LOG_STORAGE_ASYNC_WARN(actorContext, stream) LOG_LOG_S(actorContext, ::NActors::NLog::PRI_WARN, ::NKikimrServices::STREAMS_STORAGE_SERVICE, stream);

namespace NFq {

using namespace NActors;
Expand Down Expand Up @@ -149,11 +145,11 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvRegisterCoordinatorRequest::
auto response = std::make_unique<TEvCheckpointStorage::TEvRegisterCoordinatorResponse>();
response->Issues = issuesFuture.GetValue();
if (response->Issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] Failed to register graph: " << response->Issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] Failed to register graph: " << response->Issues.ToString())
} else {
LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] Graph registered")
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] Graph registered")
}
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] Send TEvRegisterCoordinatorResponse")
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] Send TEvRegisterCoordinatorResponse")
context.Send(sender, response.release(), 0, cookie);
});
}
Expand All @@ -174,7 +170,7 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
auto issues = result.second;

if (issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to fetch total graph checkpoints size: " << issues.ToString());
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to fetch total graph checkpoints size: " << issues.ToString());
context.Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie);
return false;
}
Expand All @@ -185,9 +181,9 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
TStringStream ss;
ss << "[" << coordinatorId << "] [" << checkpointId << "] Graph checkpoints size limit exceeded: limit " << totalGraphCheckpointsSizeLimit << ", current checkpoints size: " << totalGraphCheckpointsSize;
auto message = ss.Str();
LOG_STORAGE_ASYNC_WARN(context, message)
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, message)
issues.AddIssue(message);
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
context.Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie);
return false;
}
Expand Down Expand Up @@ -222,11 +218,11 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
auto issues = result.second;
auto response = std::make_unique<TEvCheckpointStorage::TEvCreateCheckpointResponse>(checkpointId, std::move(issues), result.first);
if (response->Issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to create checkpoint: " << response->Issues.ToString());
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to create checkpoint: " << response->Issues.ToString());
} else {
LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint created");
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint created");
}
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
context.Send(sender, response.release(), 0, cookie);
});
}
Expand All @@ -244,11 +240,11 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvSetCheckpointPendingCommitSt
auto issues = issuesFuture.GetValue();
auto response = std::make_unique<TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusResponse>(checkpointId, std::move(issues));
if (response->Issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'PendingCommit' status: " << response->Issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'PendingCommit' status: " << response->Issues.ToString())
} else {
LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'PendingCommit'")
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'PendingCommit'")
}
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvSetCheckpointPendingCommitStatusResponse")
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvSetCheckpointPendingCommitStatusResponse")
context.Send(sender, response.release(), 0, cookie);
});
}
Expand All @@ -268,16 +264,16 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCompleteCheckpointRequest::T
auto issues = issuesFuture.GetValue();
auto response = std::make_unique<TEvCheckpointStorage::TEvCompleteCheckpointResponse>(checkpointId, std::move(issues));
if (response->Issues) {
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'Completed' status: " << response->Issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to set 'Completed' status: " << response->Issues.ToString())
} else {
LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'Completed'")
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Status updated to 'Completed'")
if (gcEnabled) {
auto request = std::make_unique<TEvCheckpointStorage::TEvNewCheckpointSucceeded>(coordinatorId, checkpointId);
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvNewCheckpointSucceeded")
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvNewCheckpointSucceeded")
context.Send(actorGC, request.release(), 0);
}
}
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCompleteCheckpointResponse")
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCompleteCheckpointResponse")
context.Send(sender, response.release(), 0, cookie);
});
}
Expand All @@ -294,11 +290,11 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvAbortCheckpointRequest::TPtr
auto issues = issuesFuture.GetValue();
auto response = std::make_unique<TEvCheckpointStorage::TEvAbortCheckpointResponse>(checkpointId, std::move(issues));
if (response->Issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to abort checkpoint: " << response->Issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << coordinatorId << "] [" << checkpointId << "] Failed to abort checkpoint: " << response->Issues.ToString())
} else {
LOG_STORAGE_ASYNC_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint aborted")
LOG_STREAMS_STORAGE_SERVICE_AS_INFO(context, "[" << coordinatorId << "] [" << checkpointId << "] Checkpoint aborted")
}
LOG_STORAGE_ASYNC_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvAbortCheckpointResponse")
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvAbortCheckpointResponse")
context.Send(sender, response.release(), 0, cookie);
});
}
Expand All @@ -314,9 +310,9 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvGetCheckpointsMetadataReques
auto result = futureResult.GetValue();
auto response = std::make_unique<TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse>(result.first, result.second);
if (response->Issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << graphId << "] Failed to get checkpoints: " << response->Issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << graphId << "] Failed to get checkpoints: " << response->Issues.ToString())
}
LOG_STORAGE_ASYNC_DEBUG(context, "[" << graphId << "] Send TEvGetCheckpointsMetadataResponse")
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << graphId << "] Send TEvGetCheckpointsMetadataResponse")
context.Send(sender, response.release(), 0, cookie);
});
}
Expand Down Expand Up @@ -356,12 +352,12 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvSaveTaskState::TPtr& ev)
response->Record.SetTaskId(taskId);

if (issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << checkpointId << "] Failed to save task state: task: " << taskId << ", issues: " << issues.ToString())
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << checkpointId << "] Failed to save task state: task: " << taskId << ", issues: " << issues.ToString())
response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::STORAGE_ERROR);
} else {
response->Record.SetStatus(NYql::NDqProto::TEvSaveTaskStateResult::OK);
}
LOG_STORAGE_ASYNC_DEBUG(context, "[" << checkpointId << "] Send TEvSaveTaskStateResult")
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << checkpointId << "] Send TEvSaveTaskStateResult")
context.Send(sender, response.release(), 0, cookie);
});
}
Expand All @@ -383,9 +379,9 @@ void TStorageProxy::Handle(NYql::NDq::TEvDqCompute::TEvGetTaskState::TPtr& ev) {
auto response = std::make_unique<NYql::NDq::TEvDqCompute::TEvGetTaskStateResult>(checkpointId, result.second, generation);
std::swap(response->States, result.first);
if (response->Issues) {
LOG_STORAGE_ASYNC_WARN(context, "[" << checkpointId << "] Failed to get task state: taskIds: {" << JoinSeq(", ", taskIds) << "}, issues: " << response->Issues.ToString());
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(context, "[" << checkpointId << "] Failed to get task state: taskIds: {" << JoinSeq(", ", taskIds) << "}, issues: " << response->Issues.ToString());
}
LOG_STORAGE_ASYNC_DEBUG(context, "[" << checkpointId << "] Send TEvGetTaskStateResult");
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(context, "[" << checkpointId << "] Send TEvGetTaskStateResult");
context.Send(sender, response.release(), 0, cookie);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1057,7 +1057,8 @@ TFuture<TIssues> TCheckpointStorage::DeleteMarkedCheckpoints(
TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> TCheckpointStorage::GetTotalCheckpointsStateSize(const TString& graphId) {
auto result = MakeIntrusive<TGetTotalCheckpointsStateSizeContext>();
auto future = YdbConnection->TableClient.RetryOperation(
[prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this), result](TSession session) {
[prefix = YdbConnection->TablePathPrefix, graphId, thisPtr = TIntrusivePtr(this), result,
context = NActors::TActivationContext::AsActorContext()](TSession session) {
NYdb::TParamsBuilder paramsBuilder;
paramsBuilder.AddParam("$graph_id").String(graphId).Build();
auto params = paramsBuilder.Build();
Expand All @@ -1079,13 +1080,12 @@ TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult> TCheckpointStor
params,
thisPtr->DefaultExecDataQuerySettings())
.Apply(
[graphId, result](const TFuture<TDataQueryResult>& future) {
[graphId, result, context](const TFuture<TDataQueryResult>& future) {
const auto& queryResult = future.GetValue();
auto status = TStatus(queryResult);

if (!queryResult.IsSuccess()) {
LOG_STREAMS_STORAGE_SERVICE_ERROR(TStringBuilder() << "GetTotalCheckpointsStateSize: can't get total graph's checkpoints size [" << graphId << "] " << queryResult.GetIssues().ToString());
return status;
LOG_STREAMS_STORAGE_SERVICE_AS_ERROR(context, TStringBuilder() << "GetTotalCheckpointsStateSize: can't get total graph's checkpoints size [" << graphId << "] " << queryResult.GetIssues().ToString()); return status;
}

TResultSetParser parser = queryResult.GetResultSetParser(0);
Expand Down