Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,16 @@ struct TEvKqpExecuter {
ui64 ResultRowsCount = 0;
ui64 ResultRowsBytes = 0;

explicit TEvTxResponse(TTxAllocatorState::TPtr allocState)
enum class EExecutionType {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А enum из протобуфа ты решил не использовать?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А там кажется нет прям идеально подходящего по семантике.
в TKqpPhyQuery нет Scheme и он больше про запрос (есть отдельно SCRIPT)
TKqpPhyTx в принципе можно, но TKqpExecuterBase уже принимал enum ExecType вторым аргументом который прям хорошо ложился.

Наверно можно порефакторить чтоб использовать везде один enum (в том числе и в TKqpExecuterBase) но я что то не уверен сходу...

Data,
Scan,
Scheme,
Literal,
} ExecutionType;

TEvTxResponse(TTxAllocatorState::TPtr allocState, EExecutionType type)
: AllocState(std::move(allocState))
, ExecutionType(type)
{}

~TEvTxResponse();
Expand Down
8 changes: 3 additions & 5 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ namespace NKqp {
#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream)
#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "Ctx: " << *GetUserRequestContext() << ". " << stream)

enum class EExecType {
Data,
Scan
};
using EExecType = TEvKqpExecuter::TEvTxResponse::EExecutionType;

const ui64 MaxTaskSize = 48_MB;

Expand Down Expand Up @@ -114,6 +111,7 @@ struct TEvPrivate {

template <class TDerived, EExecType ExecType>
class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
static_assert(ExecType == EExecType::Data || ExecType == EExecType::Scan);
public:
TKqpExecuterBase(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
Expand Down Expand Up @@ -141,7 +139,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
TasksGraph.GetMeta().Database = Database;
TasksGraph.GetMeta().ChannelTransportVersion = chanTransportVersion;
TasksGraph.GetMeta().UserRequestContext = userRequestContext;
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc);
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc, ExecType);
ResponseEv->Orbit = std::move(Request.Orbit);
Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph,
ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats());
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ class TKqpLiteralExecuter {
, LiteralExecuterSpan(TWilsonKqp::LiteralExecuter, std::move(Request.TraceId), "LiteralExecuter")
, UserRequestContext(userRequestContext)
{
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc);
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(
Request.TxAlloc, TEvKqpExecuter::TEvTxResponse::EExecutionType::Literal);

ResponseEv->Orbit = std::move(Request.Orbit);
Stats = std::make_unique<TQueryExecutionStats>(Request.StatsMode, &TasksGraph,
ResponseEv->Record.MutableResponse()->MutableResult()->MutableStats());
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
YQL_ENSURE(PhyTx);
YQL_ENSURE(PhyTx->GetType() == NKqpProto::TKqpPhyTx::TYPE_SCHEME);

ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(nullptr);
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(
nullptr,
TEvKqpExecuter::TEvTxResponse::EExecutionType::Scheme);
}

void StartBuildOperation() {
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/kqp/executer_actor/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ PEERDIR(
ydb/library/yql/providers/common/http_gateway
)

GENERATE_ENUM_SERIALIZATION(
kqp_executer.h
)

YQL_LAST_ABI_VERSION()

END()
Expand Down
12 changes: 9 additions & 3 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1336,7 +1336,10 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
ExecuterId = TActorId{};

if (response->GetStatus() != Ydb::StatusIds::SUCCESS) {
LOG_D("TEvTxResponse has non-success status, CurrentTx: " << QueryState->CurrentTx);
const auto executionType = ev->ExecutionType;

LOG_D("TEvTxResponse has non-success status, CurrentTx: " << QueryState->CurrentTx
<< " ExecutionType: " << executionType);

auto status = response->GetStatus();
TIssues issues;
Expand All @@ -1348,11 +1351,14 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
case Ydb::StatusIds::INTERNAL_ERROR:
InvalidateQuery();
issues.AddIssue(YqlIssue(TPosition(), TIssuesIds::KIKIMR_QUERY_INVALIDATED,
TStringBuilder() << "Query invalidated on scheme/internal error."));
TStringBuilder() << "Query invalidated on scheme/internal error during "
<< executionType << " execution"));

// SCHEME_ERROR during execution is a soft (retriable) error, we abort query execution,
// invalidate query cache, and return ABORTED as retriable status.
if (status == Ydb::StatusIds::SCHEME_ERROR) {
if (status == Ydb::StatusIds::SCHEME_ERROR &&
executionType != TEvKqpExecuter::TEvTxResponse::EExecutionType::Scheme)
{
status = Ydb::StatusIds::ABORTED;
}

Expand Down
13 changes: 13 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,19 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
}

Y_UNIT_TEST(ExecuteDDLStatusCodeSchemeError) {
TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false));
{
auto db = kikimr.GetQueryClient();
auto result = db.ExecuteQuery(R"(
CREATE TABLE unsupported_TzTimestamp (key Int32, payload TzTimestamp, primary key(key)))",
TTxControl::NoTx()
).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
}
}

Y_UNIT_TEST(ExecuteQueryScalar) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
Expand Down