Skip to content

Commit a677875

Browse files
authored
Init yt dq jobs (#1460)
1 parent 15d1fa8 commit a677875

File tree

8 files changed

+188
-4
lines changed

8 files changed

+188
-4
lines changed

ydb/library/yql/providers/dq/actors/yt/resource_manager.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ namespace NYql {
1919
extern const TString OPERATION_SIZE;
2020
extern const TString YT_COORDINATOR;
2121
extern const TString YT_BACKEND;
22+
extern const TString YT_FORCE_IPV4;
2223
}
2324

2425
class ICoordinationHelper;
@@ -68,6 +69,8 @@ namespace NYql {
6869
int Capabilities = 0;
6970
int MaxRetries = -1;
7071

72+
bool ForceIPv4 = false;
73+
7174
// Pinger
7275
TString DieOnFileAbsence; // see YQL-14099
7376

ydb/library/yql/providers/dq/actors/yt/yt_resource_manager.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ namespace NYql {
3333
const TString OPERATION_SIZE("OPERATION_SIZE");
3434
const TString YT_COORDINATOR("YT_COORDINATOR");
3535
const TString YT_BACKEND("YT_BACKEND");
36+
const TString YT_FORCE_IPV4("YT_FORCE_IPV4");
3637
}
3738

3839
using namespace NActors;
@@ -600,6 +601,7 @@ namespace NYql {
600601
.BeginMap()
601602
.Item(NCommonJobVars::YT_COORDINATOR).Value(coordinatorStr)
602603
.Item(NCommonJobVars::YT_BACKEND).Value(backendStr)
604+
.Item(NCommonJobVars::YT_FORCE_IPV4).Value(Options.ForceIPv4)
603605
.DoFor(Options.YtBackend.GetVaultEnv(), [&] (NYT::TFluentMap fluent, const NYql::NProto::TDqConfig::TAttr& envVar) { // Добавляем env variables
604606
TString tokenValue;
605607
try {

ydb/library/yql/tools/dq/worker_job/dq_worker.cpp

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,16 @@
2323
#include <yt/yt/core/actions/invoker.h>
2424
#include <yt/yt/core/concurrency/action_queue.h>
2525
#include <yt/yt/core/concurrency/thread_pool.h>
26+
#include <yt/yt/core/net/address.h>
27+
#include <yt/yt/core/net/config.h>
2628

2729
#include <library/cpp/protobuf/util/pb_io.h>
2830

2931
#include <util/system/fs.h>
3032
#include <util/stream/file.h>
3133
#include <util/system/env.h>
3234
#include <util/system/shellcommand.h>
35+
#include <util/string/type.h>
3336

3437
using namespace NYql::NDqs;
3538

@@ -187,11 +190,20 @@ namespace NYql::NDq::NWorker {
187190
TRangeWalker<int> portWalker(startPort, startPort+100);
188191
auto ports = BindInRange(portWalker);
189192

193+
auto forceIPv4 = IsTrue(GetEnv(TString("YT_SECURE_VAULT_") + NCommonJobVars::YT_FORCE_IPV4, ""));
194+
if (forceIPv4) {
195+
auto config = NYT::New<NYT::NNet::TAddressResolverConfig>();
196+
config->EnableIPv4 = true;
197+
config->EnableIPv6 = false;
198+
NYT::NNet::TAddressResolver::Get()->Configure(config);
199+
}
200+
190201
auto [host, ip] = NYql::NDqs::GetLocalAddress(
191-
coordinatorConfig.HasHostName() ? &coordinatorConfig.GetHostName() : nullptr
202+
coordinatorConfig.HasHostName() ? &coordinatorConfig.GetHostName() : nullptr,
203+
forceIPv4 ? AF_INET : AF_INET6
192204
);
193205

194-
auto coordinator = CreateCoordiantionHelper(coordinatorConfig, NProto::TDqConfig::TScheduler(), "worker_node", ports[1].Addr.GetPort(), host, ip);
206+
auto coordinator = CreateCoordiantionHelper(coordinatorConfig, NProto::TDqConfig::TScheduler(), "worker_node", ports[forceIPv4 ? 0 : 1].Addr.GetPort(), host, ip);
195207
i64 cacheSize = backendConfig.HasCacheSize()
196208
? backendConfig.GetCacheSize()
197209
: 16000000000L;
@@ -280,8 +292,8 @@ namespace NYql::NDq::NWorker {
280292
std::tie(setup, logSettings) = BuildActorSetup(
281293
nodeId,
282294
ip,
283-
ports[1].Addr.GetPort(),
284-
ports[1].Socket->Release(),
295+
ports[forceIPv4 ? 0 : 1].Addr.GetPort(),
296+
ports[forceIPv4 ? 0 : 1].Socket->Release(),
285297
{},
286298
dqSensors,
287299
[](const TIntrusivePtr<NActors::TTableNameserverSetup>& setup) {
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#include <ydb/library/yql/providers/dq/runtime/task_command_executor.h>
2+
3+
#include <ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h>
4+
#include <ydb/library/yql/providers/yt/mkql_dq/yql_yt_dq_transform.h>
5+
#include <ydb/library/yql/providers/common/comp_nodes/yql_factory.h>
6+
7+
#include <ydb/library/yql/utils/backtrace/backtrace.h>
8+
9+
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
10+
#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
11+
#include <ydb/library/yql/minikql/mkql_stats_registry.h>
12+
13+
#include <ydb/library/yql/dq/integration/transform/yql_dq_task_transform.h>
14+
#include <ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.h>
15+
#include <ydb/library/yql/dq/transform/yql_common_dq_transform.h>
16+
17+
#include <library/cpp/yt/mlock/mlock.h>
18+
19+
#include <util/system/mlock.h>
20+
#include <util/stream/output.h>
21+
22+
using namespace NYql;
23+
24+
int main() {
25+
NBacktrace::RegisterKikimrFatalActions();
26+
if (!NYT::MlockFileMappings()) {
27+
Cerr << "mlockall failed, but that's fine" << Endl;
28+
}
29+
30+
NKikimr::NMiniKQL::IStatsRegistryPtr statsRegistry = NKikimr::NMiniKQL::CreateDefaultStatsRegistry();
31+
32+
auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({
33+
GetCommonDqFactory(),
34+
GetDqYtFactory(statsRegistry.Get()),
35+
NKikimr::NMiniKQL::GetYqlFactory(),
36+
});
37+
38+
auto dqTaskTransformFactory = NYql::CreateCompositeTaskTransformFactory({
39+
CreateCommonDqTaskTransformFactory(),
40+
CreateYtDqTaskTransformFactory(),
41+
});
42+
43+
return NTaskRunnerProxy::CreateTaskCommandExecutor(dqCompFactory, dqTaskTransformFactory, statsRegistry.Get(), true);
44+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
PROGRAM()
2+
3+
PEERDIR(
4+
library/cpp/yt/mlock
5+
yt/cpp/mapreduce/client
6+
ydb/library/yql/minikql/comp_nodes/llvm14
7+
ydb/library/yql/public/udf/service/terminate_policy
8+
ydb/library/yql/utils/backtrace
9+
ydb/library/yql/dq/comp_nodes
10+
ydb/library/yql/dq/integration/transform
11+
ydb/library/yql/dq/transform
12+
ydb/library/yql/dq/runtime
13+
ydb/library/yql/providers/common/comp_nodes
14+
ydb/library/yql/providers/dq/common
15+
ydb/library/yql/providers/dq/runtime
16+
ydb/library/yql/providers/yt/comp_nodes/dq
17+
ydb/library/yql/providers/yt/mkql_dq
18+
ydb/library/yql/providers/yt/codec/codegen
19+
ydb/library/yql/providers/yt/comp_nodes/llvm14
20+
ydb/library/yql/sql/pg
21+
ydb/library/yql/parser/pg_wrapper
22+
)
23+
24+
YQL_LAST_ABI_VERSION()
25+
26+
SRCS(
27+
main.cpp
28+
)
29+
30+
END()
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
#include <ydb/library/yql/providers/dq/actors/execution_helpers.h>
2+
#include <ydb/library/yql/providers/dq/runtime/task_command_executor.h>
3+
4+
#include <ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_factory.h>
5+
#include <ydb/library/yql/providers/yt/mkql_dq/yql_yt_dq_transform.h>
6+
7+
#include <ydb/library/yql/providers/common/comp_nodes/yql_factory.h>
8+
9+
#include <ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.h>
10+
#include <ydb/library/yql/dq/integration/transform/yql_dq_task_transform.h>
11+
#include <ydb/library/yql/dq/transform/yql_common_dq_transform.h>
12+
13+
#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
14+
#include <ydb/library/yql/tools/dq/worker_job/dq_worker.h>
15+
#include <ydb/library/yql/utils/backtrace/backtrace.h>
16+
17+
#include <library/cpp/svnversion/svnversion.h>
18+
19+
#include <library/cpp/yt/mlock/mlock.h>
20+
21+
using namespace NYql;
22+
23+
int main(int argc, const char* argv[]) {
24+
NBacktrace::RegisterKikimrFatalActions();
25+
NBacktrace::EnableKikimrSymbolize();
26+
27+
if (!NYT::MlockFileMappings()) {
28+
Cerr << "mlockall failed, but that's fine" << Endl;
29+
}
30+
31+
if (argc > 1) {
32+
if (!strcmp(argv[1], "-V")) {
33+
Cerr << ToString(GetProgramCommitId()) << Endl;
34+
return 0;
35+
} else if (!strcmp(argv[1], "tasks_runner_proxy")) {
36+
NKikimr::NMiniKQL::IStatsRegistryPtr statsRegistry = NKikimr::NMiniKQL::CreateDefaultStatsRegistry();
37+
38+
auto dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({
39+
GetCommonDqFactory(),
40+
GetDqYtFactory(statsRegistry.Get()),
41+
NKikimr::NMiniKQL::GetYqlFactory(),
42+
});
43+
44+
auto dqTaskTransformFactory = CreateCompositeTaskTransformFactory({
45+
CreateCommonDqTaskTransformFactory(),
46+
CreateYtDqTaskTransformFactory(),
47+
});
48+
49+
return NTaskRunnerProxy::CreateTaskCommandExecutor(dqCompFactory, dqTaskTransformFactory, statsRegistry.Get(), true);
50+
}
51+
}
52+
53+
try {
54+
NYT::Initialize(argc, argv);
55+
56+
auto job = new NDq::NWorker::TWorkerJob();
57+
58+
job->Do();
59+
} catch (...) {
60+
Cerr << CurrentExceptionMessage();
61+
return -1;
62+
}
63+
64+
return 0;
65+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
PROGRAM()
2+
3+
PEERDIR(
4+
library/cpp/svnversion
5+
library/cpp/yt/mlock
6+
ydb/library/yql/dq/comp_nodes
7+
ydb/library/yql/dq/integration/transform
8+
ydb/library/yql/dq/transform
9+
ydb/library/yql/providers/common/comp_nodes
10+
ydb/library/yql/providers/yt/codec/codegen
11+
ydb/library/yql/providers/yt/comp_nodes/llvm14
12+
ydb/library/yql/utils/backtrace
13+
ydb/library/yql/providers/yt/comp_nodes/dq
14+
ydb/library/yql/providers/yt/mkql_dq
15+
ydb/library/yql/tools/dq/worker_job
16+
ydb/library/yql/sql/pg
17+
ydb/library/yql/parser/pg_wrapper
18+
)
19+
20+
YQL_LAST_ABI_VERSION()
21+
22+
SRCS(
23+
main.cpp
24+
)
25+
26+
END()

ydb/library/yql/yt/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,7 @@ END()
99
RECURSE(
1010
dynamic
1111
native
12+
dq_vanilla_job
13+
dq_vanilla_job.lite
1214
)
1315

0 commit comments

Comments
 (0)