Skip to content

Commit

Permalink
Merge b55e18f into 70e1f23
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Jun 15, 2024
2 parents 70e1f23 + b55e18f commit b219a3c
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 52 deletions.
2 changes: 1 addition & 1 deletion ydb/core/grpc_services/rpc_forget_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
}

void SendForgetScriptExecutionOperation() {
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId, Request->GetDeadline()));
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId));
}

public:
Expand Down
11 changes: 4 additions & 7 deletions ydb/core/kqp/common/events/script_executions.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@ enum EFinalizationStatus : i32 {
};

struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
explicit TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id, TInstant deadline)
TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: Database(database)
, OperationId(id)
, Deadline(deadline)
{
}
{}

TString Database;
NOperationId::TOperationId OperationId;
TInstant Deadline;
const TString Database;
const NOperationId::TOperationId OperationId;
};

struct TEvForgetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvForgetScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperationResponse> {
Expand Down
56 changes: 26 additions & 30 deletions ydb/core/kqp/proxy_service/kqp_script_executions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -823,11 +823,10 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
static constexpr i32 MAX_NUMBER_ROWS_IN_BATCH = 100000;

public:
TForgetScriptExecutionOperationQueryActor(const TString& executionId, const TString& database, TInstant operationDeadline)
TForgetScriptExecutionOperationQueryActor(const TString& executionId, const TString& database)
: TQueryBase(__func__, executionId)
, ExecutionId(executionId)
, Database(database)
, Deadline(operationDeadline)
{}

void OnRunQuery() override {
Expand Down Expand Up @@ -891,7 +890,16 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
}
MaxRowId = *maxRowId;

ClearTimeInfo();
if (MaxRowId >= NumberRowsInBatch) {
TStringBuilder message = TStringBuilder() << "Query result rows count is " << MaxRowId + 1;
if (*maxResultSetId > 0) {
message << " in " << *maxResultSetId + 1 << " result sets";
}
NYql::TIssue issue(message << ", that is larger than allowed limit " << MAX_NUMBER_ROWS_IN_BATCH << " rows for one time forget, results will be forgotten in the background process");
issue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO);
SendResponse(Ydb::StatusIds::SUCCESS, {issue});
}

DeleteScriptResults();
}

Expand Down Expand Up @@ -937,34 +945,34 @@ class TForgetScriptExecutionOperationQueryActor : public TQueryBase {
return;
}

if (TInstant::Now() + 2 * GetAverageTime() >= Deadline) {
Finish(Ydb::StatusIds::TIMEOUT, ForgetOperationTimeoutIssues());
return;
}

DeleteScriptResults();
}

void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
Send(Owner, new TEvForgetScriptExecutionOperationResponse(status, std::move(issues)));
SendResponse(status, std::move(issues));
}

static NYql::TIssues ForgetOperationTimeoutIssues() {
return { NYql::TIssue("Forget script execution operation timeout") };
private:
void SendResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) {
if (ResponseSent) {
return;
}
ResponseSent = true;
Send(Owner, new TEvForgetScriptExecutionOperationResponse(status, std::move(issues)));
}

private:
TString ExecutionId;
TString Database;
TInstant Deadline;
const TString ExecutionId;
const TString Database;
i64 NumberRowsInBatch = 0;
i64 MaxRowId = 0;
bool ResponseSent = false;
};

class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetScriptExecutionOperationActor> {
public:
using TForgetOperationRetryActor = TQueryRetryActor<TForgetScriptExecutionOperationQueryActor, TEvForgetScriptExecutionOperationResponse, TString, TString, TInstant>;
using TForgetOperationRetryActor = TQueryRetryActor<TForgetScriptExecutionOperationQueryActor, TEvForgetScriptExecutionOperationResponse, TString, TString>;

public:
explicit TForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev)
: Request(std::move(ev))
{}
Expand Down Expand Up @@ -1002,19 +1010,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
}

KQP_PROXY_LOG_D("[TForgetScriptExecutionOperationActor] ExecutionId: " << ExecutionId << ", lease check success. Start TForgetOperationRetryActor");

TDuration minDelay = TDuration::MilliSeconds(10);
TDuration maxTime = Request->Get()->Deadline - TInstant::Now();
if (maxTime <= minDelay) {
Reply(Ydb::StatusIds::TIMEOUT, TForgetScriptExecutionOperationQueryActor::ForgetOperationTimeoutIssues());
return;
}

Register(new TForgetOperationRetryActor(
SelfId(),
TForgetOperationRetryActor::IRetryPolicy::GetExponentialBackoffPolicy(TForgetOperationRetryActor::Retryable, minDelay, TDuration::MilliSeconds(200), TDuration::Seconds(1), std::numeric_limits<size_t>::max(), maxTime),
ExecutionId, Request->Get()->Database, TInstant::Now() + maxTime
));
Register(new TForgetOperationRetryActor(SelfId(), ExecutionId, Request->Get()->Database));
}

void Handle(TEvForgetScriptExecutionOperationResponse::TPtr& ev) {
Expand Down Expand Up @@ -1042,7 +1038,7 @@ class TForgetScriptExecutionOperationActor : public TActorBootstrapped<TForgetSc
}

private:
TEvForgetScriptExecutionOperation::TPtr Request;
const TEvForgetScriptExecutionOperation::TPtr Request;
TString ExecutionId;
bool ExecutionEntryExists = true;
};
Expand Down
18 changes: 5 additions & 13 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1752,22 +1752,14 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_VALUES_EQUAL(rowsFetched, numberRows);

// Test forget operation
TInstant forgetOperationTimeout = TInstant::Now() + NSan::PlainOrUnderSanitizer(TDuration::Minutes(5), TDuration::Minutes(20));
NYdb::NOperation::TOperationClient operationClient(kikimr->GetDriver());
while (TInstant::Now() < forgetOperationTimeout) {
auto status = operationClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync();
if (status.GetStatus() == NYdb::EStatus::SUCCESS || status.GetStatus() == NYdb::EStatus::NOT_FOUND) {
return;
}

UNIT_ASSERT_C(status.GetStatus() == NYdb::EStatus::ABORTED || status.GetStatus() == NYdb::EStatus::TIMEOUT || status.GetStatus() == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED, status.GetIssues().ToString());
auto status = operationClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync();
UNIT_ASSERT_C(status.IsSuccess(), status.GetIssues().ToOneLineString());

if (status.GetStatus() == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED) {
// Wait until last forget is not finished
Sleep(TDuration::Seconds(30));
}
const size_t forgetRowsLimit = 100000;
if (numberRows > forgetRowsLimit) {
UNIT_ASSERT_STRING_CONTAINS(status.GetIssues().ToString(), TStringBuilder() << "Info: Query result rows count is " << numberRows << ", that is larger than allowed limit " << forgetRowsLimit << " rows for one time forget, results will be forgotten in the background process");
}
UNIT_ASSERT_C(false, "Forget operation timeout");
}

Y_UNIT_TEST(ExecuteScriptWithLargeStrings) {
Expand Down
4 changes: 4 additions & 0 deletions ydb/tests/tools/kqprun/src/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ class TKqpRunner::TImpl {
return false;
}

if (!status.Issues.Empty()) {
Cerr << CerrColors_.Red() << "Forget operation finished with issues:" << CerrColors_.Default() << Endl << status.Issues.ToString() << Endl;
}

return true;
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/tests/tools/kqprun/src/ydb_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ class TYdbSetup::TImpl {

NKikimr::NKqp::TEvForgetScriptExecutionOperationResponse::TPtr ForgetScriptExecutionOperationRequest(const TString& operation) const {
NKikimr::NOperationId::TOperationId operationId(operation);
auto event = MakeHolder<NKikimr::NKqp::TEvForgetScriptExecutionOperation>(Settings_.DomainName, operationId, TInstant::Max());
auto event = MakeHolder<NKikimr::NKqp::TEvForgetScriptExecutionOperation>(Settings_.DomainName, operationId);

return RunKqpProxyRequest<NKikimr::NKqp::TEvForgetScriptExecutionOperation, NKikimr::NKqp::TEvForgetScriptExecutionOperationResponse>(std::move(event));
}
Expand Down

0 comments on commit b219a3c

Please sign in to comment.