Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions ydb/library/yql/providers/dq/actors/yt/lock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ struct TLockRequest: public TActor<TLockRequest> {
auto* actorSystem = ctx.ExecutorThread.ActorSystem;
auto selfId = SelfId();
try {
Transaction->LockNode("#" + ToString(result.ValueOrThrow()), NYT::NCypressClient::ELockMode::Exclusive, options).As<void>()
YT_UNUSED_FUTURE(Transaction->LockNode("#" + ToString(result.ValueOrThrow()), NYT::NCypressClient::ELockMode::Exclusive, options).As<void>()
.Apply(BIND([actorSystem, selfId](const NYT::TErrorOr<void>& result) {
actorSystem->Send(selfId, new TEvSetNodeResponse(0, result));
}));
})));
} catch (...) {
Finish(ctx);
}
Expand All @@ -108,7 +108,7 @@ struct TLockRequest: public TActor<TLockRequest> {
}

void PassAway() override {
Transaction->Abort();
YT_UNUSED_FUTURE(Transaction->Abort());
Transaction.Reset();
Send(LockActorId, new TEvTick());
IActor::PassAway();
Expand Down Expand Up @@ -248,13 +248,13 @@ struct TLockRequest: public TActor<TLockRequest> {
try {
auto* actorSystem = ctx.ExecutorThread.ActorSystem;
auto selfId = SelfId();
Transaction->CreateNode(
YT_UNUSED_FUTURE(Transaction->CreateNode(
lockNode,
NYT::NObjectClient::EObjectType::StringNode,
NYT::NApi::TCreateNodeOptions())
.Apply(BIND([actorSystem, selfId](const NYT::TErrorOr<NYT::NCypressClient::TNodeId>& result) {
actorSystem->Send(selfId, new TEvCreateNodeResponse(0, result));
}));
})));
} catch (...) {
Finish(ctx);
}
Expand Down
56 changes: 28 additions & 28 deletions ydb/library/yql/providers/dq/actors/yt/yt_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ namespace NYql {
req->Digest = digest;
}

Client->GetNode(nodePath + "/@md5")
YT_UNUSED_FUTURE(Client->GetNode(nodePath + "/@md5")
.Apply(BIND([request, attributes, digest](const TErrorOr<NYT::NYson::TYsonString>& err) mutable {
auto req = request.Lock();
if (!req) {
Expand All @@ -302,10 +302,10 @@ namespace NYql {
if (err.IsOK() && digest == NYTree::ConvertTo<TString>(err.Value())) {
YQL_CLOG(INFO, ProviderDq) << "File already uploaded";
try {
req->Client->SetNode(req->NodePath + "/@yql_last_update",
YT_UNUSED_FUTURE(req->Client->SetNode(req->NodePath + "/@yql_last_update",
NYT::NYson::TYsonString(
NYT::NodeToYsonString(NYT::TNode(ToString(TInstant::Now()))
)));
))));
} catch (...) { }
return VoidFuture;
} else if (err.IsOK() || err.FindMatching(NYT::NYTree::EErrorCode::ResolveError)) {
Expand Down Expand Up @@ -363,7 +363,7 @@ namespace NYql {
if (auto req = request.Lock()) {
req->Complete(new TEvWriteFileResponse(requestId, err));
}
}));
})));
} catch (const std::exception& ex) {
if (auto req = request.Lock()) {
req->Complete(new TEvWriteFileResponse(requestId, ex));
Expand All @@ -385,7 +385,7 @@ namespace NYql {

auto nodePath = remotePath.GetPath();

Client->GetNode(nodePath + "/@md5")
YT_UNUSED_FUTURE(Client->GetNode(nodePath + "/@md5")
.Apply(BIND([request, nodePath, readerOptions](const TErrorOr<NYT::NYson::TYsonString>& err) mutable {
auto req = request.Lock();
if (!req) {
Expand All @@ -411,7 +411,7 @@ namespace NYql {
if (auto req = request.Lock()) {
req->Complete(new TEvReadFileResponse(requestId, err));
}
}));
})));
} catch (const std::exception& ex) {
if (auto req = request.Lock()) {
req->Complete(new TEvReadFileResponse(requestId, ex));
Expand Down Expand Up @@ -448,14 +448,14 @@ namespace NYql {
auto operationId = std::get<0>(*ev->Get());
auto options = std::get<1>(*ev->Get());

Client->GetOperation(operationId, options).Apply(BIND([=](const TErrorOr<TOperation>& result) {
YT_UNUSED_FUTURE(Client->GetOperation(operationId, options).Apply(BIND([=](const TErrorOr<TOperation>& result) {
return NYT::NYson::ConvertToYsonString(result.ValueOrThrow()).ToString();
}))
.Apply(BIND([=](const TErrorOr<TString>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvGetOperationResponse(requestId, result));
}
}));
})));
} catch (const std::exception& ex) {
if (auto req = request.Lock()) {
req->Complete(new TEvGetOperationResponse(requestId, ex));
Expand All @@ -470,11 +470,11 @@ namespace NYql {
try {
auto options = std::get<0>(*ev->Get());

Client->ListOperations(options).Apply(BIND([=](const TErrorOr<NYT::NApi::TListOperationsResult>& result) {
YT_UNUSED_FUTURE(Client->ListOperations(options).Apply(BIND([=](const TErrorOr<NYT::NApi::TListOperationsResult>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvListOperationsResponse(requestId, result));
}
}));
})));
} catch (const std::exception& ex) {
if (auto req = request.Lock()) {
req->Complete(new TEvListOperationsResponse(requestId, ex));
Expand All @@ -491,14 +491,14 @@ namespace NYql {
auto jobId = std::get<1>(*ev->Get());
auto options = std::get<2>(*ev->Get());

Client->GetJob(operationId, jobId, options).Apply(BIND([=](const TErrorOr<NYT::NYson::TYsonString>& result) {
YT_UNUSED_FUTURE(Client->GetJob(operationId, jobId, options).Apply(BIND([=](const TErrorOr<NYT::NYson::TYsonString>& result) {
return result.ValueOrThrow().ToString();
}))
.Apply(BIND([=](const TErrorOr<TString>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvGetJobResponse(requestId, result));
}
}));
})));
} catch (const std::exception& ex) {
if (auto req = request.Lock()) {
req->Complete(new TEvGetJobResponse(requestId, ex));
Expand All @@ -513,15 +513,15 @@ namespace NYql {
auto request = NewRequest<TRequest>(requestId, ev->Sender, ctx);

try {
Client->ListNode(path, options)
YT_UNUSED_FUTURE(Client->ListNode(path, options)
.Apply(BIND([=](const TErrorOr<NYT::NYson::TYsonString>& result) {
return result.ValueOrThrow().ToString();
}))
.Apply(BIND([=](const TErrorOr<TString>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvListNodeResponse(requestId, result));
}
}));
})));
} catch (const std::exception& ex) {
if (auto req = request.Lock()) {
req->Complete(new TEvListNodeResponse(requestId, ex));
Expand All @@ -536,12 +536,12 @@ namespace NYql {
auto requestId = ev->Get()->RequestId;
auto request = NewRequest<TRequest>(requestId, ev->Sender, ctx);

Client->SetNode(path, value, options)
YT_UNUSED_FUTURE(Client->SetNode(path, value, options)
.Apply(BIND([=](const TErrorOr<void>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvSetNodeResponse(requestId, result));
}
}));
})));
}

void OnGetNode(TEvGetNode::TPtr& ev, const TActorContext& ctx) {
Expand All @@ -550,12 +550,12 @@ namespace NYql {
auto requestId = ev->Get()->RequestId;
auto request = NewRequest<TRequest>(requestId, ev->Sender, ctx);

Client->GetNode(path, options)
YT_UNUSED_FUTURE(Client->GetNode(path, options)
.Apply(BIND([=](const TErrorOr<NYT::NYson::TYsonString>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvGetNodeResponse(requestId, result));
}
}));
})));
}

void OnRemoveNode(TEvRemoveNode::TPtr& ev, const TActorContext& ctx) {
Expand All @@ -564,12 +564,12 @@ namespace NYql {
auto requestId = ev->Get()->RequestId;
auto request = NewRequest<TRequest>(requestId, ev->Sender, ctx);

Client->RemoveNode(path, options)
YT_UNUSED_FUTURE(Client->RemoveNode(path, options)
.Apply(BIND([=](const TErrorOr<void>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvRemoveNodeResponse(requestId, result));
}
}));
})));
}

void OnCreateNode(TEvCreateNode::TPtr& ev, const TActorContext& ctx) {
Expand All @@ -579,12 +579,12 @@ namespace NYql {
auto requestId = ev->Get()->RequestId;
auto request = NewRequest<TRequest>(requestId, ev->Sender, ctx);

Client->CreateNode(path, type, options)
YT_UNUSED_FUTURE(Client->CreateNode(path, type, options)
.Apply(BIND([=](const TErrorOr<NYT::NCypressClient::TNodeId>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvCreateNodeResponse(requestId, result));
}
}));
})));
}

void OnStartTransaction(TEvStartTransaction::TPtr& ev, const TActorContext& ctx) {
Expand All @@ -593,12 +593,12 @@ namespace NYql {
auto requestId = ev->Get()->RequestId;
auto request = NewRequest<TRequest>(requestId, ev->Sender, ctx);

Client->StartTransaction(type, options)
YT_UNUSED_FUTURE(Client->StartTransaction(type, options)
.Apply(BIND([=](const TErrorOr<ITransactionPtr>& result) {
if (auto req = request.Lock()) {
req->Complete(new TEvStartTransactionResponse(requestId, result));
}
}));
})));
}

void OnPrintJobStderr(TEvPrintJobStderr::TPtr& ev, const TActorContext& ctx) {
Expand All @@ -607,23 +607,23 @@ namespace NYql {

YQL_CLOG(DEBUG, ProviderDq) << "Printing stderr of operation " << ToString(operationId);

Client->ListJobs(operationId)
YT_UNUSED_FUTURE(Client->ListJobs(operationId)
.Apply(BIND([operationId, client = MakeWeak(Client)](const TListJobsResult& result) {
if (auto cli = client.Lock()) {
for (const auto& job : result.Jobs) {
YQL_CLOG(DEBUG, ProviderDq) << "Printing stderr (" << ToString(operationId) << "," << ToString(job.Id) << ")";

cli->GetJobStderr(operationId, job.Id)
YT_UNUSED_FUTURE(cli->GetJobStderr(operationId, job.Id)
.Apply(BIND([jobId = job.Id, operationId](const TSharedRef& data) {
YQL_CLOG(DEBUG, ProviderDq)
<< "Stderr ("
<< ToString(operationId) << ","
<< ToString(jobId) << ")"
<< TString(data.Begin(), data.Size());
}));
})));
}
}
}));
})));
}

IClientPtr Client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ TVector<TMaybe<NYT::TNode>> InferSchemaFromTablesContents(const TString& cluster
inferers.reserve(requests.size());

std::function<void(size_t)> runRead = [&](size_t i) {
inputs[i]->Read().ApplyUnique(BIND([&inferers, &promises, &runRead, i = i](NYT::TErrorOr<NYT::TSharedRef>&& res){
YT_UNUSED_FUTURE(inputs[i]->Read().ApplyUnique(BIND([&inferers, &promises, &runRead, i = i](NYT::TErrorOr<NYT::TSharedRef>&& res){
if (res.IsOK() && !res.Value()) {
// EOS
promises[i].Set();
Expand Down Expand Up @@ -75,7 +75,7 @@ TVector<TMaybe<NYT::TNode>> InferSchemaFromTablesContents(const TString& cluster
promises[i].Set(e);
}
runRead(i);
}));
})));
};

futures.reserve(requests.size());
Expand Down Expand Up @@ -107,13 +107,13 @@ TVector<TMaybe<NYT::TNode>> InferSchemaFromTablesContents(const TString& cluster
request->set_format("<format=text>yson");
promises.push_back(NYT::NewPromise<void>());
futures.push_back(promises.back().ToFuture());
CreateRpcClientInputStream(std::move(request)).ApplyUnique(BIND([&runRead, &inputs, i](NYT::NConcurrency::IAsyncZeroCopyInputStreamPtr&& stream) {
YT_UNUSED_FUTURE(CreateRpcClientInputStream(std::move(request)).ApplyUnique(BIND([&runRead, &inputs, i](NYT::NConcurrency::IAsyncZeroCopyInputStreamPtr&& stream) {
// first packet contains meta, skip it
return stream->Read().ApplyUnique(BIND([&runRead, stream = std::move(stream), i, &inputs](NYT::TSharedRef&&) {
inputs[i] = std::move(stream);
runRead(i);
}));
}));
})));
++i;
}
YQL_ENSURE(NYT::NConcurrency::WaitFor(AllSucceeded(futures)).IsOK(), "Excepted all promises to be resolved in InferSchemaFromTablesContents");
Expand Down