Skip to content

Commit a2af99e

Browse files
committed
Add handling if spilling service is not started.
TDqLocalFileSpillingActor sends events with flag to traack undelivery. Add unit tests for this. Fix LOG macroses in channel_storage_actor.cpp to handle ActorSystem pointer also.
1 parent 40f9120 commit a2af99e

File tree

3 files changed

+90
-23
lines changed

3 files changed

+90
-23
lines changed

ydb/library/yql/dq/actors/spilling/channel_storage_actor.cpp

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,53 @@ using namespace NActors;
1818

1919
namespace {
2020

21-
#define LOG_D(s) \
22-
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s)
23-
#define LOG_I(s) \
24-
LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
25-
#define LOG_E(s) \
26-
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s)
27-
#define LOG_C(s) \
28-
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
29-
#define LOG_W(s) \
30-
LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s)
31-
#define LOG_T(s) \
32-
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s)
21+
#define LOG_D(s) { \
22+
if (ActorSystem_) { \
23+
LOG_DEBUG_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
24+
} else { \
25+
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
26+
} \
27+
}
28+
29+
#define LOG_I(s) { \
30+
if (ActorSystem_) { \
31+
LOG_INFO_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
32+
} else { \
33+
LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
34+
} \
35+
}
36+
37+
#define LOG_E(s) { \
38+
if (ActorSystem_) { \
39+
LOG_ERROR_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
40+
} else { \
41+
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
42+
} \
43+
}
44+
45+
#define LOG_C(s) { \
46+
if (ActorSystem_) { \
47+
LOG_CRIT_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
48+
} else { \
49+
LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
50+
} \
51+
}
52+
53+
#define LOG_W(s) { \
54+
if (ActorSystem_) { \
55+
LOG_WARN_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
56+
} else { \
57+
LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId << ", channelId: " << ChannelId << ". " << s); \
58+
} \
59+
}
60+
61+
#define LOG_T(s) { \
62+
if (ActorSystem_) { \
63+
LOG_TRACE_S(*ActorSystem_, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
64+
} else { \
65+
LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_COMPUTE, "TxId: " << TxId_ << ", channelId: " << ChannelId_ << ". " << s); \
66+
} \
67+
}
3368

3469
constexpr ui32 MAX_INFLIGHT_BLOBS_COUNT = 10;
3570
constexpr ui64 MAX_INFLIGHT_BLOBS_SIZE = 50_MB;

ydb/library/yql/dq/actors/spilling/spilling_file.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class TDqLocalFileSpillingActor : public TActorBootstrapped<TDqLocalFileSpilling
7777
YQL_ENSURE(ServiceActorId_);
7878

7979
LOG_D("Register LocalFileSpillingActor " << SelfId() << " at service " << ServiceActorId_);
80-
Send(ServiceActorId_, new TEvDqSpillingLocalFile::TEvOpenFile(TxId_, Details_, RemoveBlobsAfterRead_));
80+
Send(ServiceActorId_, new TEvDqSpillingLocalFile::TEvOpenFile(TxId_, Details_, RemoveBlobsAfterRead_), NActors::IEventHandle::FlagTrackDelivery);
8181

8282
Become(&TDqLocalFileSpillingActor::WorkState);
8383
}
@@ -91,13 +91,14 @@ class TDqLocalFileSpillingActor : public TActorBootstrapped<TDqLocalFileSpilling
9191
hFunc(TEvDqSpilling::TEvRead, HandleWork)
9292
hFunc(TEvDqSpilling::TEvReadResult, HandleWork)
9393
hFunc(TEvDqSpilling::TEvError, HandleWork)
94+
sFunc(NActors::TEvents::TEvUndelivered, HandleUndelivered)
9495
hFunc(TEvents::TEvPoison, HandleWork)
9596
);
9697

9798
void HandleWork(TEvDqSpilling::TEvWrite::TPtr& ev) {
9899
ValidateSender(ev->Sender);
99100

100-
Send(ServiceActorId_, ev->Release().Release());
101+
Send(ServiceActorId_, ev->Release().Release(), NActors::IEventHandle::FlagTrackDelivery);
101102
}
102103

103104
void HandleWork(TEvDqSpilling::TEvWriteResult::TPtr& ev) {
@@ -109,7 +110,7 @@ class TDqLocalFileSpillingActor : public TActorBootstrapped<TDqLocalFileSpilling
109110
void HandleWork(TEvDqSpilling::TEvRead::TPtr& ev) {
110111
ValidateSender(ev->Sender);
111112

112-
Send(ServiceActorId_, ev->Release().Release());
113+
Send(ServiceActorId_, ev->Release().Release(), NActors::IEventHandle::FlagTrackDelivery);
113114
}
114115

115116
void HandleWork(TEvDqSpilling::TEvReadResult::TPtr& ev) {
@@ -125,17 +126,21 @@ class TDqLocalFileSpillingActor : public TActorBootstrapped<TDqLocalFileSpilling
125126
void HandleWork(TEvents::TEvPoison::TPtr& ev) {
126127
ValidateSender(ev->Sender);
127128

128-
Send(ServiceActorId_, new TEvDqSpillingLocalFile::TEvCloseFile);
129+
Send(ServiceActorId_, new TEvDqSpillingLocalFile::TEvCloseFile, NActors::IEventHandle::FlagTrackDelivery);
129130
PassAway();
130131
}
131132

133+
void HandleUndelivered() {
134+
Send(ClientActorId_, new TEvDqSpilling::TEvError("Spilling Service not started"));
135+
}
136+
132137
private:
133138
void ValidateSender(const TActorId& sender) {
134139
YQL_ENSURE(ClientActorId_ == sender, "" << ClientActorId_ << " != " << sender);
135140
}
136141

137142
void ClientLost() {
138-
Send(ServiceActorId_, new TEvDqSpillingLocalFile::TEvCloseFile("Client lost"));
143+
Send(ServiceActorId_, new TEvDqSpillingLocalFile::TEvCloseFile("Client lost"), NActors::IEventHandle::FlagTrackDelivery);
139144
PassAway();
140145
}
141146

ydb/library/yql/dq/actors/spilling/spilling_file_ut.cpp

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -377,12 +377,10 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) {
377377
}
378378

379379
Y_UNIT_TEST(ReadError) {
380-
//return;
381-
382380
TTestActorRuntime runtime;
383381
runtime.Initialize();
384382

385-
runtime.StartSpillingService();
383+
auto spillingSvc = runtime.StartSpillingService();
386384
auto tester = runtime.AllocateEdgeActor();
387385
auto spillingActor = runtime.StartSpillingActor(tester);
388386

@@ -396,15 +394,16 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) {
396394
UNIT_ASSERT_VALUES_EQUAL(0, resp->Get()->BlobId);
397395
}
398396

399-
(runtime.GetSpillingRoot() / "node_1" / "1_test_0").ForceDelete();
397+
auto nodePath = TFsPath("node_" + std::to_string(spillingSvc.NodeId()));
398+
(runtime.GetSpillingRoot() / nodePath / "1_test_0").ForceDelete();
400399

401400
{
402401
auto ev = new TEvDqSpilling::TEvRead(0, true);
403402
runtime.Send(new IEventHandle(spillingActor, tester, ev));
404403

405404
auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvError>(tester);
406-
auto& err = resp->Get()->Message;
407-
auto expected = "can't open \"" + runtime.GetSpillingRoot().GetPath() + "/node_1/1_test_0\" with mode RdOnly";
405+
auto err = resp->Get()->Message;
406+
auto expected = "can't open \"" + runtime.GetSpillingRoot().GetPath() + "/" + nodePath.GetPath() +"/1_test_0\" with mode RdOnly";
408407
UNIT_ASSERT_C(err.Contains("No such file or directory"), err);
409408
UNIT_ASSERT_C(err.Contains(expected), err);
410409
}
@@ -451,6 +450,34 @@ Y_UNIT_TEST_SUITE(DqSpillingFileTests) {
451450
}
452451
}
453452

453+
Y_UNIT_TEST(NoSpillingService) {
454+
TTestActorRuntime runtime;
455+
runtime.Initialize();
456+
457+
auto tester = runtime.AllocateEdgeActor();
458+
auto spillingActor = runtime.StartSpillingActor(tester);
459+
460+
runtime.WaitBootstrap();
461+
462+
// put blob 1
463+
{
464+
auto ev = new TEvDqSpilling::TEvWrite(1, CreateRope(10, 'a'));
465+
runtime.Send(new IEventHandle(spillingActor, tester, ev));
466+
467+
auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvError>(tester, TDuration::Seconds(1));
468+
UNIT_ASSERT_EQUAL("Spilling Service not started", resp->Get()->Message);
469+
}
470+
471+
// get blob 1
472+
{
473+
auto ev = new TEvDqSpilling::TEvRead(1);
474+
runtime.Send(new IEventHandle(spillingActor, tester, ev));
475+
476+
auto resp = runtime.GrabEdgeEvent<TEvDqSpilling::TEvError>(tester, TDuration::Seconds(1));
477+
UNIT_ASSERT_EQUAL("Spilling Service not started", resp->Get()->Message);
478+
}
479+
}
480+
454481
} // suite
455482

456483
} // namespace NYql::NDq

0 commit comments

Comments
 (0)