From 2c74decb9e1905d411300d2a410457056fef0b6b Mon Sep 17 00:00:00 2001 From: Marina Pereskokova Date: Tue, 30 Jan 2024 23:05:09 +0300 Subject: [PATCH 1/2] Move yql dq job core in os --- .../yql/tools/dq/worker_job/dq_worker.cpp | 373 ++++++++++++++++++ .../yql/tools/dq/worker_job/dq_worker.h | 44 +++ ydb/library/yql/tools/dq/worker_job/ya.make | 31 ++ ydb/library/yql/tools/dq/ya.make | 1 + 4 files changed, 449 insertions(+) create mode 100644 ydb/library/yql/tools/dq/worker_job/dq_worker.cpp create mode 100644 ydb/library/yql/tools/dq/worker_job/dq_worker.h create mode 100644 ydb/library/yql/tools/dq/worker_job/ya.make diff --git a/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp b/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp new file mode 100644 index 000000000000..a47739ac4381 --- /dev/null +++ b/ydb/library/yql/tools/dq/worker_job/dq_worker.cpp @@ -0,0 +1,373 @@ +#include "dq_worker.h" + +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include +#include +#include +#include + +using namespace NYql::NDqs; + +namespace { + template + THolder ParseProtoConfig(const TString& cfgFile) { + auto config = MakeHolder(); + TString configData = TFileInput(cfgFile).ReadAll();; + + using ::google::protobuf::TextFormat; + if (!TextFormat::ParseFromString(configData, config.Get())) { + YQL_LOG(ERROR) << "Bad format of dq_vanilla_job configuration"; + return {}; + } + + return config; + } + + static NThreading::TPromise ShouldContinue = NThreading::NewPromise(); + + static void OnTerminate(int) { + ShouldContinue.SetValue(); + } + + class TSerializedTaskRunnerInvoker: public ITaskRunnerInvoker { + public: + TSerializedTaskRunnerInvoker(const NYT::IInvokerPtr& invoker) + : Invoker(NYT::NConcurrency::CreateSerializedInvoker(invoker)) + { } + + void Invoke(const std::function& f) override { + Invoker->Invoke(BIND(f)); + } + + private: + const NYT::IInvokerPtr Invoker; + }; + + class TConcurrentInvokerFactory: public ITaskRunnerInvokerFactory { + public: + TConcurrentInvokerFactory(int capacity) + : ThreadPool(NYT::NConcurrency::CreateThreadPool(capacity, "WorkerActor")) + { } + + ITaskRunnerInvoker::TPtr Create() override { + return new TSerializedTaskRunnerInvoker(ThreadPool->GetInvoker()); + } + + NYT::NConcurrency::IThreadPoolPtr ThreadPool; + }; + + void ConfigurePorto(const NYql::NProto::TDqConfig::TYtBackend& config, const TString portoCtl) { + TString settings[][2] = { + {"enable_porto", "isolate"}, + {"respawn", "true"} + }; + int nSettings = 2; + { + TShellCommand cmd(portoCtl, {"create", "Outer"}); + cmd.Run().Wait(); + } + for (int i = 0; i < nSettings; i++) { + TShellCommand cmd(portoCtl, {"set", "Outer", settings[i][0], settings[i][1]}); + cmd.Run().Wait(); + } + for (const auto& attr : config.GetPortoSettings().GetSetting()) { + TShellCommand cmd(portoCtl, {"set", "Outer", attr.GetName(), attr.GetValue()}); + cmd.Run().Wait(); + } + { + TShellCommand cmd(portoCtl, {"start", "Outer"}); + cmd.Run().Wait(); + } + { + TShellCommand cmd(portoCtl, {"wait", "Outer"}); + cmd.Run().Wait(); + } + } +} + +namespace NYql::NDq::NWorker { + + void TDefaultWorkerConfigurator::ConfigureMetrics(const THolder& /*loggerConfig*/, const THolder& /*actorSystem*/, const NProto::TDqConfig::TYtBackend& /*backendConfig*/, const TResourceManagerOptions& /*rmOptions*/, ui32 /*nodeId*/) const { + } + + NDq::IDqAsyncIoFactory::TPtr TDefaultWorkerConfigurator::CreateAsyncIoFactory() const { + return MakeIntrusive(); + } + + void TDefaultWorkerConfigurator::OnWorkerFinish() { + } + + TWorkerJob::TWorkerJob() + : WorkerConfigurator(MakeHolder(TDefaultWorkerConfigurator())) + { } + + void TWorkerJob::SetConfigFile(const TString& configFile) { + ConfigFile = configFile; + } + + void TWorkerJob::SetWorkerConfigurator(THolder workerConfigurator) { + WorkerConfigurator = std::move(workerConfigurator); + } + + void TWorkerJob::Do() { + + auto loggerConfig = MakeHolder(); + + ui16 startPort = 0; + + auto deterministicMode = !!GetEnv("YQL_DETERMINISTIC_MODE"); + + YQL_ENSURE(TryFromString(GetEnv(NCommonJobVars::ACTOR_PORT), startPort), + "Invalid service config port env var empty"); + + ui32 tryNodeId; + YQL_ENSURE(TryFromString(GetEnv(NCommonJobVars::ACTOR_NODE_ID, "0"), tryNodeId), + "Invalid nodeId env var"); + + if (!ConfigFile.empty()) { + loggerConfig = ParseProtoConfig(ConfigFile); + + for (auto& logDest : *loggerConfig->MutableLogDest()) { + if (logDest.GetType() == NYql::NProto::TLoggingConfig::FILE) { + TString logFile = logDest.GetTarget() + "." + ToString(tryNodeId); + logDest.SetTarget(logFile); + } + } + + loggerConfig->SetAllComponentsLevel(NYql::NProto::TLoggingConfig::TRACE); + } else { + loggerConfig->SetAllComponentsLevel(NYql::NProto::TLoggingConfig::DEBUG); + } + NYql::NLog::InitLogger(*loggerConfig, false); + InitSignals(); + + TString fileCacheDir = GetEnv(NCommonJobVars::UDFS_PATH); + TString ytCoordinatorStr = GetEnv(TString("YT_SECURE_VAULT_") + NCommonJobVars::YT_COORDINATOR); + + TString ytBackendStr = GetEnv(TString("YT_SECURE_VAULT_") + NCommonJobVars::YT_BACKEND); + + TString operationId = GetEnv("YT_OPERATION_ID"); + TString jobId = GetEnv("YT_JOB_ID"); + + TString operationSize = GetEnv(NCommonJobVars::OPERATION_SIZE); + + NProto::TDqConfig::TYtCoordinator coordinatorConfig; + TStringInput inputStream1(ytCoordinatorStr); + ParseFromTextFormat(inputStream1, coordinatorConfig, EParseFromTextFormatOption::AllowUnknownField); + + NProto::TDqConfig::TYtBackend backendConfig; + TStringInput inputStream2(ytBackendStr); + ParseFromTextFormat(inputStream2, backendConfig, EParseFromTextFormatOption::AllowUnknownField); + + TRangeWalker portWalker(startPort, startPort+100); + auto ports = BindInRange(portWalker); + + auto [host, ip] = NYql::NDqs::GetLocalAddress( + coordinatorConfig.HasHostName() ? &coordinatorConfig.GetHostName() : nullptr + ); + + auto coordinator = CreateCoordiantionHelper(coordinatorConfig, NProto::TDqConfig::TScheduler(), "worker_node", ports[1].Addr.GetPort(), host, ip); + i64 cacheSize = backendConfig.HasCacheSize() + ? backendConfig.GetCacheSize() + : 16000000000L; + TIntrusivePtr fileCache = new TFileCache(fileCacheDir + "/cache", cacheSize); + NFs::SymLink(fileCacheDir, "file_cache"); // COMPAT + TString layerDir = fileCacheDir + "/layer"; + if (backendConfig.GetPortoLayer().size() > 0) { + NFs::MakeDirectoryRecursive(layerDir + "/mnt/work"); + for (const auto& layerPath : backendConfig.GetPortoLayer()) { + auto pos = layerPath.rfind('/'); + auto archive = layerPath.substr(pos+1); + TShellCommand cmd("tar", {"xf", archive, "-C", layerDir}); + cmd.Run().Wait(); + } + } else { + NFs::MakeDirectoryRecursive("mnt/work"); + NFs::MakeDirectoryRecursive("usr/local/bin"); + } + + int capacity = backendConfig.GetWorkerCapacity() + ? backendConfig.GetWorkerCapacity() + : 1; + + NYql::NTaskRunnerProxy::TPipeFactoryOptions pfOptions; + pfOptions.ExecPath = GetExecPath(); + pfOptions.FileCache = fileCache; + if (deterministicMode) { + YQL_LOG(DEBUG) << "deterministicMode On"; + pfOptions.Env["YQL_DETERMINISTIC_MODE"] = "1"; + } + if (backendConfig.GetEnforceJobUtc()) { + pfOptions.Env["TZ"] = "UTC0"; + } + pfOptions.EnablePorto = backendConfig.GetEnablePorto() == "isolate"; + pfOptions.PortoLayer = backendConfig.GetPortoLayer().size() == 0 ? "" : layerDir; + pfOptions.MaxProcesses = capacity*1.5; + pfOptions.ContainerName = "Outer"; + + TResourceManagerOptions rmOptions; + rmOptions.YtBackend = backendConfig; + rmOptions.FileCache = fileCache; + rmOptions.TmpDir = fileCacheDir + "/tmp"; + + if (NFs::Exists(layerDir + "/usr/bin/portoctl")) { + TString dst = fileCache->GetDir() + "/portoctl"; + NFs::Copy(layerDir + "/usr/bin/portoctl", dst); + NFs::HardLink(layerDir + "/usr/bin/portoctl", layerDir + "/usr/sbin/portoctl"); // workaround PORTO-997 + chmod(dst.c_str(), 0755); + pfOptions.PortoCtlPath = dst; + rmOptions.DieOnFileAbsence = dst; // die on file absence + } + + Cerr << host + ":" + ip << Endl; + + THashMap attributes; + attributes[NCommonAttrs::OPERATIONID_ATTR] = operationId; + attributes[NCommonAttrs::OPERATIONSIZE_ATTR] = operationSize; + attributes[NCommonAttrs::JOBID_ATTR] = jobId; + attributes[NCommonAttrs::CLUSTERNAME_ATTR] = backendConfig.GetClusterName(); + + auto nodeIdOpt = (tryNodeId == 0) + ? TMaybe() + : TMaybe(tryNodeId); + auto nodeId = coordinator->GetNodeId( + nodeIdOpt, + {}, + static_cast(NDqs::ENodeIdLimits::MinWorkerNodeId), + static_cast(NDqs::ENodeIdLimits::MaxWorkerNodeId), + attributes); + + Y_ABORT_UNLESS( + static_cast(NDqs::ENodeIdLimits::MinWorkerNodeId) <= nodeId && + nodeId < static_cast(NDqs::ENodeIdLimits::MaxWorkerNodeId)); + + Cerr << "My nodeId: " << nodeId << Endl; + + Cerr << "Configure porto" << Endl; + if (backendConfig.GetEnablePorto() == "isolate") { + ConfigurePorto(backendConfig, pfOptions.PortoCtlPath); + } + Cerr << "Configure porto done" << Endl; + + auto dqSensors = GetSensorsGroupFor(NSensorComponent::kDq); + THolder setup; + TIntrusivePtr logSettings; + std::tie(setup, logSettings) = BuildActorSetup( + nodeId, + ip, + ports[1].Addr.GetPort(), + ports[1].Socket->Release(), + {}, + dqSensors, + [](const TIntrusivePtr& setup) { + return NYql::NDqs::CreateDynamicNameserver(setup); + }, + Nothing(), + backendConfig.GetICSettings()); + + auto statsCollector = CreateStatsCollector(5, *setup.Get(), dqSensors); + + auto actorSystem = MakeHolder(setup, nullptr, logSettings); + + actorSystem->Start(); + + actorSystem->Register(statsCollector); + + TVector hostPortPairs; + for (auto hostPortPair : coordinatorConfig.GetServiceNodeHostPort()) { + hostPortPairs.emplace_back(hostPortPair); + // tests + if (hostPortPair.StartsWith("localhost")) { + rmOptions.ExitOnPingFail = true; + } + } + + WorkerConfigurator->ConfigureMetrics(loggerConfig, actorSystem, backendConfig, rmOptions, nodeId); + + // rmOptions.MetricsRegistry = CreateMetricsRegistry(dqSensors); // send metrics to gwm, unsupported + auto resolver = coordinator->CreateServiceNodeResolver(actorSystem.Get(), hostPortPairs); + actorSystem->Register(coordinator->CreateServiceNodePinger(resolver, rmOptions, attributes)); + + NLog::YqlLogger().UpdateProcInfo(jobId + "/" + GetGuidAsString(coordinator->GetRuntimeData()->WorkerId)); + + // For testing only + THashMap clusterMapping; + clusterMapping["plato"] = backendConfig.GetClusterName(); + + auto proxyFactory = NTaskRunnerProxy::CreatePipeFactory(pfOptions); + ITaskRunnerInvokerFactory::TPtr invokerFactory = new TConcurrentInvokerFactory(2*capacity); + auto taskRunnerActorFactory = NTaskRunnerActor::CreateTaskRunnerActorFactory(proxyFactory, invokerFactory, nullptr, coordinator->GetRuntimeData()); + + TLocalWorkerManagerOptions lwmOptions; + lwmOptions.Factory = proxyFactory; + lwmOptions.TaskRunnerActorFactory = taskRunnerActorFactory; + lwmOptions.AsyncIoFactory = WorkerConfigurator->CreateAsyncIoFactory(); + lwmOptions.RuntimeData = coordinator->GetRuntimeData(); + lwmOptions.TaskRunnerInvokerFactory = invokerFactory; + lwmOptions.ClusterNamesMapping = clusterMapping; + lwmOptions.ComputeActorOwnsCounters = true; + + auto resman = NDqs::CreateLocalWorkerManager(lwmOptions); + + auto workerManagerActorId = actorSystem->Register(resman); + actorSystem->RegisterLocalService(MakeWorkerManagerActorID(nodeId), workerManagerActorId); + + if (backendConfig.HasSpillingSettings()) { + auto spilling = NDq::CreateDqLocalFileSpillingService( + NDq::TFileSpillingServiceConfig { + .Root = backendConfig.GetSpillingSettings().GetRoot(), + .MaxTotalSize = backendConfig.GetSpillingSettings().GetMaxTotalSize(), + .MaxFileSize = backendConfig.GetSpillingSettings().GetMaxFileSize(), + .MaxFilePartSize = backendConfig.GetSpillingSettings().GetMaxFilePartSize(), + .IoThreadPoolWorkersCount = backendConfig.GetSpillingSettings().GetIoThreadPoolWorkersCount(), + .IoThreadPoolQueueSize = backendConfig.GetSpillingSettings().GetIoThreadPoolQueueSize(), + .CleanupOnShutdown = backendConfig.GetSpillingSettings().GetCleanupOnShutdown() + }, + MakeIntrusive(dqSensors) + ); + auto spillingActor = actorSystem->Register(spilling); + actorSystem->RegisterLocalService(NDq::MakeDqLocalFileSpillingServiceID(nodeId), spillingActor); + } + + auto endFuture = ShouldContinue.GetFuture(); + + signal(SIGINT, &OnTerminate); + signal(SIGTERM, &OnTerminate); + signal(SIGPIPE, SIG_IGN); + + // run forever + + endFuture.Wait(); + WorkerConfigurator->OnWorkerFinish(); + actorSystem->Stop(); + dqSensors->OutputHtml(Cerr); + } + + REGISTER_VANILLA_JOB(TWorkerJob); + +} // namespace NYql::NDq::NWorker diff --git a/ydb/library/yql/tools/dq/worker_job/dq_worker.h b/ydb/library/yql/tools/dq/worker_job/dq_worker.h new file mode 100644 index 000000000000..f37895c6dca6 --- /dev/null +++ b/ydb/library/yql/tools/dq/worker_job/dq_worker.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include + +#include + +#include + +#include + +namespace NYql::NDq::NWorker { + struct IWorkerConfigurator + { + virtual ~IWorkerConfigurator() = default; + + virtual void ConfigureMetrics(const THolder& loggerConfig, const THolder& actorSystem, const NProto::TDqConfig::TYtBackend& backendConfig, const TResourceManagerOptions& rmOptions, ui32 nodeId) const = 0; + virtual IDqAsyncIoFactory::TPtr CreateAsyncIoFactory() const = 0; + virtual void OnWorkerFinish() = 0; + }; + + struct TDefaultWorkerConfigurator + : public IWorkerConfigurator + { + void ConfigureMetrics(const THolder& /*loggerConfig*/, const THolder& /*actorSystem*/, const NProto::TDqConfig::TYtBackend& /*backendConfig*/, const TResourceManagerOptions& /*rmOptions*/, ui32 /*nodeId*/) const override; + IDqAsyncIoFactory::TPtr CreateAsyncIoFactory() const override; + void OnWorkerFinish() override; + }; + + class TWorkerJob: public NYT::IVanillaJob { + public: + TWorkerJob(); + + void SetConfigFile(const TString& configFile); + void SetWorkerConfigurator(THolder workerConfigurator); + + void Do() override; + + private: + TString ConfigFile; + THolder WorkerConfigurator; + }; + +} // namespace NYql::NDq::NWorker diff --git a/ydb/library/yql/tools/dq/worker_job/ya.make b/ydb/library/yql/tools/dq/worker_job/ya.make new file mode 100644 index 000000000000..863f494f4b0b --- /dev/null +++ b/ydb/library/yql/tools/dq/worker_job/ya.make @@ -0,0 +1,31 @@ +LIBRARY() + +SRCS( + dq_worker.cpp +) + +PEERDIR( + contrib/libs/protobuf + library/cpp/protobuf/util + yt/cpp/mapreduce/client + yt/cpp/mapreduce/interface + yt/yt/core + ydb/library/yql/dq/actors/spilling + ydb/library/yql/minikql/comp_nodes/llvm14 + ydb/library/yql/providers/common/metrics + ydb/library/yql/providers/dq/runtime + ydb/library/yql/providers/dq/service + ydb/library/yql/providers/dq/stats_collector + ydb/library/yql/providers/dq/task_runner + ydb/library/yql/public/udf/service/terminate_policy + ydb/library/yql/utils + ydb/library/yql/utils/log + ydb/library/yql/utils/log/proto + ydb/library/yql/providers/dq/actors/yt + ydb/library/yql/providers/dq/global_worker_manager + ydb/library/yql/utils/signals +) + +YQL_LAST_ABI_VERSION() + +END() \ No newline at end of file diff --git a/ydb/library/yql/tools/dq/ya.make b/ydb/library/yql/tools/dq/ya.make index 1043bcb32795..3bdf652c4015 100644 --- a/ydb/library/yql/tools/dq/ya.make +++ b/ydb/library/yql/tools/dq/ya.make @@ -2,4 +2,5 @@ RECURSE( dq_cli service_node worker_node + worker_job ) From e0caebd5e2ed6907318f79c095ca02df9943345f Mon Sep 17 00:00:00 2001 From: Marina Pereskokova Date: Tue, 30 Jan 2024 23:06:38 +0300 Subject: [PATCH 2/2] Codestyle --- ydb/library/yql/tools/dq/worker_job/ya.make | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/library/yql/tools/dq/worker_job/ya.make b/ydb/library/yql/tools/dq/worker_job/ya.make index 863f494f4b0b..14971a568b26 100644 --- a/ydb/library/yql/tools/dq/worker_job/ya.make +++ b/ydb/library/yql/tools/dq/worker_job/ya.make @@ -28,4 +28,4 @@ PEERDIR( YQL_LAST_ABI_VERSION() -END() \ No newline at end of file +END()