diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.cpp index dd7a44754015..e4b2d63f554f 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.cpp +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.cpp @@ -1,6 +1,8 @@ #include "dq_yt_rpc_reader.h" #include "dq_yt_rpc_helpers.h" +#include + #include "yt/cpp/mapreduce/common/helpers.h" #include @@ -84,6 +86,13 @@ void TParallelFileInputState::Finish() { MkqlReader_.Finish(); } +void TParallelFileInputState::CheckError() const { + if (!InnerState_->Error.IsOK()) { + Cerr << "YT RPC Reader exception:\n"; + InnerState_->Error.ThrowOnError(); + } +} + bool TParallelFileInputState::RunNext() { while (true) { size_t InputIdx = 0; @@ -120,6 +129,11 @@ bool TParallelFileInputState::RunNext() { Cerr << (TStringBuilder() "Warn: request took: " << elapsed << " mcs)\n"); } #endif + + TFailureInjector::Reach("dq_rpc_reader_read_err_when_empty", [&res_] { + res_ = NYT::TErrorOr(NYT::TError("failure injected")); + }); + if (!res_.IsOK()) { std::lock_guard lock(state->Lock); state->Error = std::move(res_); @@ -127,6 +141,7 @@ bool TParallelFileInputState::RunNext() { state->WaitPromise.TrySet(); return; } + auto block = std::move(res_.Value()); NYT::NApi::NRpcProxy::NProto::TRowsetDescriptor descriptor; NYT::NApi::NRpcProxy::NProto::TRowsetStatistics statistics; @@ -160,6 +175,7 @@ bool TParallelFileInputState::NextValue() { #ifdef RPC_PRINT_TIME print_add(-1); #endif + CheckError(); return false; } if (MkqlReader_.IsValid()) { @@ -186,10 +202,7 @@ bool TParallelFileInputState::NextValue() { TResult result; { std::lock_guard lock(InnerState_->Lock); - if (!InnerState_->Error.IsOK()) { - Cerr << "YT RPC Reader exception:\n"; - InnerState_->Error.ThrowOnError(); - } + CheckError(); if (InnerState_->Results.empty()) { continue; } diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.h b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.h index 0219ac564506..1cd8bda60294 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.h +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_reader.h @@ -45,6 +45,7 @@ struct TReaderState { bool NextValue(); private: + void CheckError() const; // Used to pass struct in lambdas. std::shared_ptr copying is thread-safe struct TInnerState { TInnerState(size_t inputsCount) : IsInputDone(inputsCount) {}; diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/ya.make b/ydb/library/yql/providers/yt/comp_nodes/dq/ya.make index cba05d599a94..7bdc790740ad 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/ya.make +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/ya.make @@ -6,6 +6,7 @@ PEERDIR( ydb/library/yql/providers/yt/comp_nodes ydb/library/yql/providers/yt/codec ydb/library/yql/providers/common/codec + ydb/library/yql/utils/failure_injector yt/cpp/mapreduce/interface yt/cpp/mapreduce/common library/cpp/yson/node