From 998409c3ee884e2d4e993060f00ba1893249d976 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 23 Jan 2024 16:06:56 +0000 Subject: [PATCH 1/5] Fixed uncaught exception in TS3FileWriteActor --- .../providers/s3/actors/yql_s3_write_actor.cpp | 18 +++++++----------- ydb/tests/tools/kqprun/kqprun.cpp | 11 +++++++++++ 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 6185dc1e0282..94df6cd178b8 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -139,7 +139,7 @@ class TS3FileWriteActor : public TActorBootstrapped { const TString& token) : TxId(txId) , Gateway(std::move(gateway)) - , Credentials(std::move(crdentials)) + , AuthInfo(crdentials.GetAuthInfo()) , RetryPolicy(retryPolicy) , ActorSystem(TActivationContext::ActorSystem()) , Key(key) @@ -155,14 +155,13 @@ class TS3FileWriteActor : public TActorBootstrapped { void Bootstrap(const TActorId& parentId) { ParentId = parentId; LOG_D("TS3FileWriteActor", "Bootstrap by " << ParentId << " for Key: [" << Key << "], Url: [" << Url << "], request id: [" << RequestId << "]"); - auto authInfo = Credentials.GetAuthInfo(); if (DirtyWrite && Parts->IsSealed() && Parts->Size() <= 1) { Become(&TS3FileWriteActor::SinglepartWorkingStateFunc); const size_t size = Max(Parts->Volume(), 1); InFlight += size; SentSize += size; Gateway->Upload(Url, - IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()), + IHTTPGateway::MakeYcHeaders(RequestId, AuthInfo.GetToken(), {}, AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()), Parts->Pop(), std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, RequestId, size, std::placeholders::_1), true, @@ -170,7 +169,7 @@ class TS3FileWriteActor : public TActorBootstrapped { } else { Become(&TS3FileWriteActor::MultipartInitialStateFunc); Gateway->Upload(Url + "?uploads", - IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()), + IHTTPGateway::MakeYcHeaders(RequestId, AuthInfo.GetToken(), {}, AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()), 0, std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, RequestId, std::placeholders::_1), false, @@ -367,9 +366,8 @@ class TS3FileWriteActor : public TActorBootstrapped { Tags.emplace_back(); InFlight += size; SentSize += size; - auto authInfo = Credentials.GetAuthInfo(); Gateway->Upload(Url + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, - IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()), + IHTTPGateway::MakeYcHeaders(RequestId, AuthInfo.GetToken(), {}, AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()), std::move(part), std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, RequestId, std::placeholders::_1), true, @@ -395,9 +393,8 @@ class TS3FileWriteActor : public TActorBootstrapped { for (const auto& tag : Tags) xml << "" << ++i << "" << tag << "" << Endl; xml << "" << Endl; - auto authInfo = Credentials.GetAuthInfo(); Gateway->Upload(Url + "?uploadId=" + UploadId, - IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), "application/xml", authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()), + IHTTPGateway::MakeYcHeaders(RequestId, AuthInfo.GetToken(), "application/xml", AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()), xml, std::bind(&TS3FileWriteActor::OnMultipartUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, RequestId, SentSize, std::placeholders::_1), false, @@ -412,9 +409,8 @@ class TS3FileWriteActor : public TActorBootstrapped { return; } - auto authInfo = Credentials.GetAuthInfo(); Gateway->Delete(Url + "?uploadId=" + UploadId, - IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), "application/xml", authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()), + IHTTPGateway::MakeYcHeaders(RequestId, AuthInfo.GetToken(), "application/xml", AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()), std::bind(&TS3FileWriteActor::OnMultipartUploadAbort, ActorSystem, SelfId(), TxId, RequestId, std::placeholders::_1), RetryPolicy); UploadId.clear(); @@ -425,7 +421,7 @@ class TS3FileWriteActor : public TActorBootstrapped { const TTxId TxId; const IHTTPGateway::TPtr Gateway; - const TS3Credentials Credentials; + const TS3Credentials::TAuthInfo AuthInfo; const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy; TActorSystem* const ActorSystem; diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 7d91b3977e7f..5b250f2734da 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -232,8 +232,19 @@ void RunMain(int argc, const char* argv[]) { RunScript(executionOptions, runnerOptions); } +void KqprunTerminateHandler() { + NColorizer::TColors colors = NColorizer::AutoColors(Cerr); + + Cerr << colors.Red() << "======= terminate() call stack ========" << colors.Default() << Endl; + FormatBackTrace(&Cerr); + Cerr << colors.Red() << "=======================================" << colors.Default() << Endl; + + abort(); +} int main(int argc, const char* argv[]) { + std::set_terminate(KqprunTerminateHandler); + try { RunMain(argc, argv); } catch (...) { From 6a697fe663f9be62b2b6834a6b46e578efcf31bf Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 23 Jan 2024 16:55:31 +0000 Subject: [PATCH 2/5] Added try catch --- .../s3/actors/yql_s3_write_actor.cpp | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 94df6cd178b8..dced02c81e9c 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -139,7 +139,7 @@ class TS3FileWriteActor : public TActorBootstrapped { const TString& token) : TxId(txId) , Gateway(std::move(gateway)) - , AuthInfo(crdentials.GetAuthInfo()) + , Credentials(std::move(crdentials)) , RetryPolicy(retryPolicy) , ActorSystem(TActivationContext::ActorSystem()) , Key(key) @@ -155,6 +155,9 @@ class TS3FileWriteActor : public TActorBootstrapped { void Bootstrap(const TActorId& parentId) { ParentId = parentId; LOG_D("TS3FileWriteActor", "Bootstrap by " << ParentId << " for Key: [" << Key << "], Url: [" << Url << "], request id: [" << RequestId << "]"); + if (!UpdateAuthInfo()) { + return; + } if (DirtyWrite && Parts->IsSealed() && Parts->Size() <= 1) { Become(&TS3FileWriteActor::SinglepartWorkingStateFunc); const size_t size = Max(Parts->Volume(), 1); @@ -359,6 +362,17 @@ class TS3FileWriteActor : public TActorBootstrapped { } } + bool UpdateAuthInfo() { + try { + AuthInfo = Credentials.GetAuthInfo(); + return true; + } + catch (const yexception& ex) { + Send(ParentId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::BAD_REQUEST, TStringBuilder() << "Failed to get auth info: " << ex.what())); + return false; + } + } + void StartUploadParts() { while (auto part = Parts->Pop()) { const auto size = part.size(); @@ -366,6 +380,9 @@ class TS3FileWriteActor : public TActorBootstrapped { Tags.emplace_back(); InFlight += size; SentSize += size; + if (!UpdateAuthInfo()) { + return; + } Gateway->Upload(Url + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, IHTTPGateway::MakeYcHeaders(RequestId, AuthInfo.GetToken(), {}, AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()), std::move(part), @@ -393,6 +410,9 @@ class TS3FileWriteActor : public TActorBootstrapped { for (const auto& tag : Tags) xml << "" << ++i << "" << tag << "" << Endl; xml << "" << Endl; + if (!UpdateAuthInfo()) { + return; + } Gateway->Upload(Url + "?uploadId=" + UploadId, IHTTPGateway::MakeYcHeaders(RequestId, AuthInfo.GetToken(), "application/xml", AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()), xml, @@ -409,6 +429,9 @@ class TS3FileWriteActor : public TActorBootstrapped { return; } + if (!UpdateAuthInfo()) { + return; + } Gateway->Delete(Url + "?uploadId=" + UploadId, IHTTPGateway::MakeYcHeaders(RequestId, AuthInfo.GetToken(), "application/xml", AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()), std::bind(&TS3FileWriteActor::OnMultipartUploadAbort, ActorSystem, SelfId(), TxId, RequestId, std::placeholders::_1), @@ -421,7 +444,7 @@ class TS3FileWriteActor : public TActorBootstrapped { const TTxId TxId; const IHTTPGateway::TPtr Gateway; - const TS3Credentials::TAuthInfo AuthInfo; + const TS3Credentials Credentials; const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy; TActorSystem* const ActorSystem; @@ -437,6 +460,7 @@ class TS3FileWriteActor : public TActorBootstrapped { TString UploadId; bool DirtyWrite; TString Token; + TS3Credentials::TAuthInfo AuthInfo; }; class TS3WriteActor : public TActorBootstrapped, public IDqComputeActorAsyncOutput { From bb5d6a7e647e9d273261a518274beed497a782bc Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 23 Jan 2024 16:57:16 +0000 Subject: [PATCH 3/5] Added AuthInfo initialization --- ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index dced02c81e9c..b37f924701a6 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -148,6 +148,7 @@ class TS3FileWriteActor : public TActorBootstrapped { , Parts(MakeCompressorQueue(compression)) , DirtyWrite(dirtyWrite) , Token(token) + , AuthInfo(Credentials.GetAuthInfo()) { YQL_ENSURE(Parts, "Compression '" << compression << "' is not supported."); } @@ -155,9 +156,6 @@ class TS3FileWriteActor : public TActorBootstrapped { void Bootstrap(const TActorId& parentId) { ParentId = parentId; LOG_D("TS3FileWriteActor", "Bootstrap by " << ParentId << " for Key: [" << Key << "], Url: [" << Url << "], request id: [" << RequestId << "]"); - if (!UpdateAuthInfo()) { - return; - } if (DirtyWrite && Parts->IsSealed() && Parts->Size() <= 1) { Become(&TS3FileWriteActor::SinglepartWorkingStateFunc); const size_t size = Max(Parts->Volume(), 1); From e0f4f232977bf35d6e51cb55faef99922312a98a Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Wed, 24 Jan 2024 08:09:09 +0000 Subject: [PATCH 4/5] Removed field AuthInfo --- .../s3/actors/yql_s3_write_actor.cpp | 76 +++++++++++-------- 1 file changed, 44 insertions(+), 32 deletions(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index b37f924701a6..5681a772ee39 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -148,7 +148,6 @@ class TS3FileWriteActor : public TActorBootstrapped { , Parts(MakeCompressorQueue(compression)) , DirtyWrite(dirtyWrite) , Token(token) - , AuthInfo(Credentials.GetAuthInfo()) { YQL_ENSURE(Parts, "Compression '" << compression << "' is not supported."); } @@ -156,21 +155,30 @@ class TS3FileWriteActor : public TActorBootstrapped { void Bootstrap(const TActorId& parentId) { ParentId = parentId; LOG_D("TS3FileWriteActor", "Bootstrap by " << ParentId << " for Key: [" << Key << "], Url: [" << Url << "], request id: [" << RequestId << "]"); + try { + BeginPartsUpload(Credentials.GetAuthInfo()); + } + catch (const yexception& ex) { + FailOnException(); + } + } + + void BeginPartsUpload(const TS3Credentials::TAuthInfo& authInfo) { if (DirtyWrite && Parts->IsSealed() && Parts->Size() <= 1) { - Become(&TS3FileWriteActor::SinglepartWorkingStateFunc); + Become(&TS3FileWriteActor::StateFuncWrapper<&TS3FileWriteActor::SinglepartWorkingStateFunc>); const size_t size = Max(Parts->Volume(), 1); InFlight += size; SentSize += size; Gateway->Upload(Url, - IHTTPGateway::MakeYcHeaders(RequestId, AuthInfo.GetToken(), {}, AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()), + IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()), Parts->Pop(), std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, RequestId, size, std::placeholders::_1), true, RetryPolicy); } else { - Become(&TS3FileWriteActor::MultipartInitialStateFunc); + Become(&TS3FileWriteActor::StateFuncWrapper<&TS3FileWriteActor::MultipartInitialStateFunc>); Gateway->Upload(Url + "?uploads", - IHTTPGateway::MakeYcHeaders(RequestId, AuthInfo.GetToken(), {}, AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()), + IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()), 0, std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, RequestId, std::placeholders::_1), false, @@ -186,7 +194,7 @@ class TS3FileWriteActor : public TActorBootstrapped { void PassAway() override { if (InFlight || !Parts->Empty()) { - AbortMultipartUpload(); + SafeAbortMultipartUpload(); LOG_W("TS3FileWriteActor", "PassAway: but NOT finished, InFlight: " << InFlight << ", Parts: " << Parts->Size() << ", Sealed: " << Parts->IsSealed() << ", request id: [" << RequestId << "]"); } else { LOG_D("TS3FileWriteActor", "PassAway: request id: [" << RequestId << "]"); @@ -236,6 +244,15 @@ class TS3FileWriteActor : public TActorBootstrapped { return InFlight + Parts->Volume(); } private: + template + STFUNC(StateFuncWrapper) { + try { + (this->*DelegatedStateFunc)(ev); + } catch (...) { + FailOnException(); + } + } + STRICT_STFUNC(MultipartInitialStateFunc, hFunc(TEvPrivate::TEvUploadStarted, Handle); ) @@ -347,7 +364,7 @@ class TS3FileWriteActor : public TActorBootstrapped { void Handle(TEvPrivate::TEvUploadStarted::TPtr& result) { UploadId = result->Get()->UploadId; - Become(&TS3FileWriteActor::MultipartWorkingStateFunc); + Become(&TS3FileWriteActor::StateFuncWrapper<&TS3FileWriteActor::MultipartWorkingStateFunc>); StartUploadParts(); } @@ -360,17 +377,6 @@ class TS3FileWriteActor : public TActorBootstrapped { } } - bool UpdateAuthInfo() { - try { - AuthInfo = Credentials.GetAuthInfo(); - return true; - } - catch (const yexception& ex) { - Send(ParentId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::BAD_REQUEST, TStringBuilder() << "Failed to get auth info: " << ex.what())); - return false; - } - } - void StartUploadParts() { while (auto part = Parts->Pop()) { const auto size = part.size(); @@ -378,11 +384,9 @@ class TS3FileWriteActor : public TActorBootstrapped { Tags.emplace_back(); InFlight += size; SentSize += size; - if (!UpdateAuthInfo()) { - return; - } + auto authInfo = Credentials.GetAuthInfo(); Gateway->Upload(Url + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, - IHTTPGateway::MakeYcHeaders(RequestId, AuthInfo.GetToken(), {}, AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()), + IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()), std::move(part), std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, RequestId, std::placeholders::_1), true, @@ -408,18 +412,25 @@ class TS3FileWriteActor : public TActorBootstrapped { for (const auto& tag : Tags) xml << "" << ++i << "" << tag << "" << Endl; xml << "" << Endl; - if (!UpdateAuthInfo()) { - return; - } + auto authInfo = Credentials.GetAuthInfo(); Gateway->Upload(Url + "?uploadId=" + UploadId, - IHTTPGateway::MakeYcHeaders(RequestId, AuthInfo.GetToken(), "application/xml", AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()), + IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), "application/xml", authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()), xml, std::bind(&TS3FileWriteActor::OnMultipartUploadFinish, ActorSystem, SelfId(), ParentId, Key, Url, RequestId, SentSize, std::placeholders::_1), false, RetryPolicy); } - void AbortMultipartUpload() { + void SafeAbortMultipartUpload() { + try { + AbortMultipartUpload(Credentials.GetAuthInfo()); + } + catch (const yexception& ex) { + LOG_W("TS3FileWriteActor", "Failed to abort multipart upload, error: " << CurrentExceptionMessage()); + } + } + + void AbortMultipartUpload(const TS3Credentials::TAuthInfo& authInfo) { // Try to abort multipart upload in case of unexpected termination. // In case of error just logs warning. @@ -427,16 +438,18 @@ class TS3FileWriteActor : public TActorBootstrapped { return; } - if (!UpdateAuthInfo()) { - return; - } Gateway->Delete(Url + "?uploadId=" + UploadId, - IHTTPGateway::MakeYcHeaders(RequestId, AuthInfo.GetToken(), "application/xml", AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()), + IHTTPGateway::MakeYcHeaders(RequestId, authInfo.GetToken(), "application/xml", authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()), std::bind(&TS3FileWriteActor::OnMultipartUploadAbort, ActorSystem, SelfId(), TxId, RequestId, std::placeholders::_1), RetryPolicy); UploadId.clear(); } + void FailOnException() { + Send(ParentId, new TEvPrivate::TEvUploadError(NYql::NDqProto::StatusIds::BAD_REQUEST, CurrentExceptionMessage())); + SafeAbortMultipartUpload(); + } + size_t InFlight = 0ULL; size_t SentSize = 0ULL; @@ -458,7 +471,6 @@ class TS3FileWriteActor : public TActorBootstrapped { TString UploadId; bool DirtyWrite; TString Token; - TS3Credentials::TAuthInfo AuthInfo; }; class TS3WriteActor : public TActorBootstrapped, public IDqComputeActorAsyncOutput { From 7a9678db1081510b5475e823129cc7ce8b59fa6d Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Wed, 24 Jan 2024 14:07:59 +0000 Subject: [PATCH 5/5] Fixed catch scope --- ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 5681a772ee39..46bf335802c0 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -157,8 +157,7 @@ class TS3FileWriteActor : public TActorBootstrapped { LOG_D("TS3FileWriteActor", "Bootstrap by " << ParentId << " for Key: [" << Key << "], Url: [" << Url << "], request id: [" << RequestId << "]"); try { BeginPartsUpload(Credentials.GetAuthInfo()); - } - catch (const yexception& ex) { + } catch (...) { FailOnException(); } } @@ -424,8 +423,7 @@ class TS3FileWriteActor : public TActorBootstrapped { void SafeAbortMultipartUpload() { try { AbortMultipartUpload(Credentials.GetAuthInfo()); - } - catch (const yexception& ex) { + } catch (...) { LOG_W("TS3FileWriteActor", "Failed to abort multipart upload, error: " << CurrentExceptionMessage()); } }