diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index b1b91aaf73..4a8ca74921 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -124,6 +124,7 @@ set(SUB_DIRECTORIES_LIST prometheus prometheus/labels prometheus/schedulers prometheus/async ebpf ebpf/observer ebpf/security ebpf/handler parser sls_control sdk + host_monitor host_monitor/collector ) if (LINUX) if (ENABLE_ENTERPRISE) diff --git a/core/app_config/AppConfig.cpp b/core/app_config/AppConfig.cpp index d492af6b7b..a660601346 100644 --- a/core/app_config/AppConfig.cpp +++ b/core/app_config/AppConfig.cpp @@ -164,6 +164,7 @@ DEFINE_FLAG_STRING(loong_collector_operator_service, "loong collector operator s DEFINE_FLAG_INT32(loong_collector_operator_service_port, "loong collector operator service port", 8888); DEFINE_FLAG_INT32(loong_collector_k8s_meta_service_port, "loong collector operator service port", 9000); DEFINE_FLAG_STRING(_pod_name_, "agent pod name", ""); +DEFINE_FLAG_INT32(process_collect_silent_count, "number of process scanned between a sleep", 1000); DEFINE_FLAG_STRING(app_info_file, "", "app_info.json"); DEFINE_FLAG_STRING(crash_stack_file_name, "crash stack back trace file name", "backtrace.dat"); diff --git a/core/application/Application.cpp b/core/application/Application.cpp index 09901cced9..7cd18c73e3 100644 --- a/core/application/Application.cpp +++ b/core/application/Application.cpp @@ -14,6 +14,8 @@ #include "application/Application.h" +#include "timer/Timer.h" + #ifndef LOGTAIL_NO_TC_MALLOC #include #endif @@ -263,9 +265,9 @@ void Application::Start() { // GCOVR_EXCL_START } ProcessorRunner::GetInstance()->Init(); + Timer::GetInstance()->Init(); - time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0, - lastCheckTagsTime = 0, lastQueueGCTime = 0; + time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0, lastCheckTagsTime = 0, lastQueueGCTime = 0; #ifndef LOGTAIL_NO_TC_MALLOC time_t lastTcmallocReleaseMemTime = 0; #endif diff --git a/core/common/FileSystemUtil.cpp b/core/common/FileSystemUtil.cpp index 39ef57c067..28d51fb9f2 100644 --- a/core/common/FileSystemUtil.cpp +++ b/core/common/FileSystemUtil.cpp @@ -141,6 +141,50 @@ bool ReadFileContent(const std::string& fileName, std::string& content, uint32_t return true; } +int GetLines(std::istream& is, + bool enableEmptyLine, + const std::function& pushBack, + std::string* errorMessage) { + std::string line; + // 此处必须判断eof,具体原因参见: + // https://stackoverflow.com/questions/40561482/getline-throws-basic-iosclear-exception-after-reading-the-last-line + while (!is.eof() && std::getline(is, line)) { + if (enableEmptyLine || !line.empty()) { + pushBack(line); + } + } + return 0; +} + +int GetLines(const std::filesystem::path& filename, + bool enableEmptyLine, + const std::function& pushBack, + std::string* errorMessage) { + int ret = 0; + std::ifstream fin; + try { + fin.exceptions(std::ifstream::failbit | std::ifstream::badbit); + fin.open(filename.string(), std::ios_base::in); + fin.exceptions(std::ifstream::goodbit); + GetLines(fin, enableEmptyLine, pushBack, errorMessage); + fin.close(); + } catch (const std::exception& fail) { + if (errorMessage != nullptr) { + LOG_ERROR(sLogger, ("open file fail", filename)("errno", strerror(errno))); + ret = -1; + } + fin.close(); + } + return ret; +} + +int GetFileLines(const std::filesystem::path& filename, + std::vector& res, + bool enableEmptyLine, + std::string* errorMessage) { + return GetLines(filename, enableEmptyLine, [&res](const std::string& s) { res.push_back(s); }, errorMessage); +} + bool OverwriteFile(const std::string& fileName, const std::string& content) { FILE* pFile = fopen(fileName.c_str(), "w"); if (pFile == NULL) { diff --git a/core/common/FileSystemUtil.h b/core/common/FileSystemUtil.h index 3ad1b046ed..4254c85b3f 100644 --- a/core/common/FileSystemUtil.h +++ b/core/common/FileSystemUtil.h @@ -26,6 +26,8 @@ #elif defined(_MSC_VER) #include #endif +#include + #include "DevInode.h" #include "ErrorUtil.h" #include "LogtailCommonFlags.h" @@ -87,6 +89,19 @@ void TrimLastSeperator(std::string& path); // ReadFileContent reads all content of @fileName to @content. bool ReadFileContent(const std::string& fileName, std::string& content, uint32_t maxFileSize = 8192); +int GetLines(std::istream& is, + bool enableEmptyLine, + const std::function& pushBack, + std::string* errorMessage); +int GetLines(const std::filesystem::path& filename, + bool enableEmptyLine, + const std::function& pushBack, + std::string* errorMessage); +int GetFileLines(const std::filesystem::path& filename, + std::vector& res, + bool enableEmptyLine = true, + std::string* errorMessage = nullptr); + // OverwriteFile overwrides @fileName with @content. bool OverwriteFile(const std::string& fileName, const std::string& content); diff --git a/core/common/MachineInfoUtil.cpp b/core/common/MachineInfoUtil.cpp index 47e0650fa7..d545802427 100644 --- a/core/common/MachineInfoUtil.cpp +++ b/core/common/MachineInfoUtil.cpp @@ -15,6 +15,11 @@ #include "MachineInfoUtil.h" #include + +#include + +#include "AppConfig.h" +#include "common/UUIDUtil.h" #if defined(__linux__) #include #include @@ -40,8 +45,10 @@ #include "FileSystemUtil.h" #include "StringTools.h" +#include "common/FileSystemUtil.h" #include "logger/Logger.h" +DEFINE_FLAG_STRING(agent_host_id, "", ""); #if defined(_MSC_VER) typedef LONG NTSTATUS, *PNTSTATUS; @@ -496,6 +503,92 @@ size_t FetchECSMetaCallback(char* buffer, size_t size, size_t nmemb, std::string return sizes; } +std::string GetSerialNumberFromEcsAssist(const std::string& machineIdFile) { + std::string sn; + if (CheckExistance(machineIdFile)) { + if (!ReadFileContent(machineIdFile, sn)) { + return ""; + } + } + return sn; +} + +static std::string GetEcsAssistMachineIdFile() { +#if defined(WIN32) + return "C:\\ProgramData\\aliyun\\assist\\hybrid\\machine-id"; +#else + return "/usr/local/share/aliyun-assist/hybrid/machine-id"; +#endif +} + +std::string GetSerialNumberFromEcsAssist() { + return GetSerialNumberFromEcsAssist(GetEcsAssistMachineIdFile()); +} + +std::string RandomHostid() { + static std::string hostId = CalculateRandomUUID(); + return hostId; +} + +const std::string& GetLocalHostId() { + static std::string fileName = AppConfig::GetInstance()->GetLoongcollectorConfDir() + PATH_SEPARATOR + "host_id"; + static std::string hostId; + if (!hostId.empty()) { + return hostId; + } + if (CheckExistance(fileName)) { + if (!ReadFileContent(fileName, hostId)) { + hostId = ""; + } + } + if (hostId.empty()) { + hostId = RandomHostid(); + + LOG_INFO(sLogger, ("save hostId file to local file system, hostId", hostId)); + int fd = open(fileName.c_str(), O_CREAT | O_EXCL | O_WRONLY, 0755); + if (fd == -1) { + int savedErrno = errno; + if (savedErrno != EEXIST) { + LOG_ERROR(sLogger, ("save hostId file fail", fileName)("errno", strerror(savedErrno))); + } + } else { + // 文件成功创建,现在写入hostId + ssize_t written = write(fd, hostId.c_str(), hostId.length()); + if (written == static_cast(hostId.length())) { + LOG_INFO(sLogger, ("hostId saved successfully to", fileName)); + } else { + int writeErrno = errno; + LOG_ERROR(sLogger, ("Failed to write hostId to file", fileName)("errno", strerror(writeErrno))); + } + close(fd); + } + } + return hostId; +} + +std::string FetchHostId() { + static std::string hostId; + if (!hostId.empty()) { + return hostId; + } + hostId = STRING_FLAG(agent_host_id); + if (!hostId.empty()) { + return hostId; + } + ECSMeta meta = FetchECSMeta(); + hostId = meta.instanceID; + if (!hostId.empty()) { + return hostId; + } + hostId = GetSerialNumberFromEcsAssist(); + if (!hostId.empty()) { + return hostId; + } + hostId = GetLocalHostId(); + + return hostId; +} + ECSMeta FetchECSMeta() { CURL* curl; for (size_t retryTimes = 1; retryTimes <= 5; retryTimes++) { diff --git a/core/common/MachineInfoUtil.h b/core/common/MachineInfoUtil.h index 0983cbf352..d8d6a3e963 100644 --- a/core/common/MachineInfoUtil.h +++ b/core/common/MachineInfoUtil.h @@ -15,10 +15,11 @@ */ #pragma once +#include +#include + #include #include -#include -#include namespace logtail { @@ -37,8 +38,9 @@ std::string GetHostIp(const std::string& intf = ""); void GetAllPids(std::unordered_set& pids); bool GetKernelInfo(std::string& kernelRelease, int64_t& kernelVersion); bool GetRedHatReleaseInfo(std::string& os, int64_t& osVersion, std::string bashPath = ""); -bool IsDigitsDotsHostname(const char *hostname); +bool IsDigitsDotsHostname(const char* hostname); ECSMeta FetchECSMeta(); +std::string FetchHostId(); // GetAnyAvailableIP walks through all interfaces (AF_INET) to find an available IP. // Priority: diff --git a/core/common/StringTools.cpp b/core/common/StringTools.cpp index 0e800cb754..36ad405bfc 100644 --- a/core/common/StringTools.cpp +++ b/core/common/StringTools.cpp @@ -363,4 +363,12 @@ void RemoveFilePathTrailingSlash(std::string& filePath) { filePath = path.string(); } +bool IsInt(const char* sz) { + bool ok = (sz != nullptr && *sz != '\0'); + for (auto* it = sz; ok && *it; ++it) { + ok = (0 != std::isdigit(*it)); + } + return ok; +} + } // namespace logtail diff --git a/core/common/StringTools.h b/core/common/StringTools.h index 4d519dc8fa..da20c45e73 100644 --- a/core/common/StringTools.h +++ b/core/common/StringTools.h @@ -146,6 +146,12 @@ bool NormalizeTopicRegFormat(std::string& regStr); void RemoveFilePathTrailingSlash(std::string& path); +bool IsInt(const char* sz); + +inline bool IsInt(const std::string& str) { + return IsInt(str.c_str()); +} + #if defined(_MSC_VER) // TODO: Test it. #define FNM_PATHNAME 0 diff --git a/core/common/timer/Timer.cpp b/core/common/timer/Timer.cpp index 08ed7ed12c..57c7ab77f1 100644 --- a/core/common/timer/Timer.cpp +++ b/core/common/timer/Timer.cpp @@ -23,6 +23,9 @@ namespace logtail { void Timer::Init() { { lock_guard lock(mThreadRunningMux); + if (mIsThreadRunning) { + return; + } mIsThreadRunning = true; } mThreadRes = async(launch::async, &Timer::Run, this); @@ -31,6 +34,9 @@ void Timer::Init() { void Timer::Stop() { { lock_guard lock(mThreadRunningMux); + if (!mIsThreadRunning) { + return; + } mIsThreadRunning = false; } mCV.notify_one(); diff --git a/core/common/timer/Timer.h b/core/common/timer/Timer.h index 6825e8443d..ad1ca15be4 100644 --- a/core/common/timer/Timer.h +++ b/core/common/timer/Timer.h @@ -34,11 +34,21 @@ struct TimerEventCompare { class Timer { public: + Timer(const Timer&) = delete; + Timer(Timer&&) = delete; + Timer& operator=(const Timer&) = delete; + Timer& operator=(Timer&&) = delete; + ~Timer() = default; + static Timer* GetInstance() { + static Timer sInstance; + return &sInstance; + } void Init(); void Stop(); void PushEvent(std::unique_ptr&& e); private: + Timer() = default; void Run(); mutable std::mutex mQueueMux; @@ -53,6 +63,7 @@ class Timer { #ifdef APSARA_UNIT_TEST_MAIN friend class TimerUnittest; friend class ScrapeSchedulerUnittest; + friend class HostMonitorInputRunnerUnittest; #endif }; diff --git a/core/common/timer/TimerEvent.h b/core/common/timer/TimerEvent.h index 005d60e882..e6064b667a 100644 --- a/core/common/timer/TimerEvent.h +++ b/core/common/timer/TimerEvent.h @@ -29,6 +29,7 @@ class TimerEvent { virtual bool Execute() = 0; std::chrono::steady_clock::time_point GetExecTime() const { return mExecTime; } + void SetExecTime(std::chrono::steady_clock::time_point nextExecTime) { mExecTime = nextExecTime; } private: std::chrono::steady_clock::time_point mExecTime; diff --git a/core/constants/EntityConstants.cpp b/core/constants/EntityConstants.cpp new file mode 100644 index 0000000000..17df9c2b5c --- /dev/null +++ b/core/constants/EntityConstants.cpp @@ -0,0 +1,48 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "EntityConstants.h" + +namespace logtail { + +const std::string DEFAULT_ENV_KEY_HOST_TYPE = "HOST_TYPE"; +const std::string DEFAULT_ENV_VALUE_ECS = "ecs"; +const std::string DEFAULT_ENV_VALUE_HOST = "host"; +const std::string DEFAULT_CONTENT_KEY_ENTITY_TYPE = "__entity_type__"; +const std::string DEFAULT_CONTENT_KEY_ENTITY_ID = "__entity_id__"; +const std::string DEFAULT_CONTENT_KEY_DOMAIN = "__domain__"; +const std::string DEFAULT_CONTENT_VALUE_DOMAIN_ACS = "acs"; +const std::string DEFAULT_CONTENT_VALUE_DOMAIN_INFRA = "infra"; +const std::string DEFAULT_CONTENT_KEY_FIRST_OBSERVED_TIME = "__first_observed_time__"; +const std::string DEFAULT_CONTENT_KEY_LAST_OBSERVED_TIME = "__last_observed_time__"; +const std::string DEFAULT_CONTENT_KEY_KEEP_ALIVE_SECONDS = "__keep_alive_seconds__"; +const std::string DEFAULT_CONTENT_KEY_METHOD = "__method__"; +const std::string DEFAULT_CONTENT_VALUE_METHOD_UPDATE = "update"; +const std::string DEFAULT_CONTENT_VALUE_METHOD_EXPIRE = "expire"; + +// for process entity +const std::string DEFAULT_CONTENT_VALUE_ENTITY_TYPE_PROCESS = "process"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_PID = "process_pid"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_PPID = "process_ppid"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_USER = "process_user"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_COMM = "process_comm"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_CREATE_TIME = "process_create_time"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_CWD = "process_cwd"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_BINARY = "process_binary"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_ARGUMENTS = "process_arguments"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_LANGUAGE = "process_language"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_CONTAINER_ID = "process_container_id"; +} // namespace logtail diff --git a/core/constants/EntityConstants.h b/core/constants/EntityConstants.h new file mode 100644 index 0000000000..0da3a13920 --- /dev/null +++ b/core/constants/EntityConstants.h @@ -0,0 +1,48 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace logtail { + +extern const std::string DEFAULT_ENV_KEY_HOST_TYPE; +extern const std::string DEFAULT_ENV_VALUE_ECS; +extern const std::string DEFAULT_ENV_VALUE_HOST; +extern const std::string DEFAULT_CONTENT_KEY_ENTITY_TYPE; +extern const std::string DEFAULT_CONTENT_KEY_ENTITY_ID; +extern const std::string DEFAULT_CONTENT_KEY_DOMAIN; +extern const std::string DEFAULT_CONTENT_VALUE_DOMAIN_ACS; +extern const std::string DEFAULT_CONTENT_VALUE_DOMAIN_INFRA; +extern const std::string DEFAULT_CONTENT_KEY_FIRST_OBSERVED_TIME; +extern const std::string DEFAULT_CONTENT_KEY_LAST_OBSERVED_TIME; +extern const std::string DEFAULT_CONTENT_KEY_KEEP_ALIVE_SECONDS; +extern const std::string DEFAULT_CONTENT_KEY_METHOD; +extern const std::string DEFAULT_CONTENT_VALUE_METHOD_UPDATE; +extern const std::string DEFAULT_CONTENT_VALUE_METHOD_EXPIRE; + +// for process entity +extern const std::string DEFAULT_CONTENT_VALUE_ENTITY_TYPE_PROCESS; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_PID; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_PPID; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_USER; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_COMM; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_CREATE_TIME; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_CWD; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_BINARY; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_ARGUMENTS; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_LANGUAGE; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_CONTAINER_ID; +} // namespace logtail diff --git a/core/host_monitor/Constants.cpp b/core/host_monitor/Constants.cpp new file mode 100644 index 0000000000..4d538400e7 --- /dev/null +++ b/core/host_monitor/Constants.cpp @@ -0,0 +1,30 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Constants.h" + + +namespace logtail { + +#ifndef APSARA_UNIT_TEST_MAIN +const std::filesystem::path PROCESS_DIR = "/proc"; +#else +std::filesystem::path PROCESS_DIR = "/proc"; +#endif + +const std::filesystem::path PROCESS_STAT = "stat"; + +} // namespace logtail diff --git a/core/host_monitor/Constants.h b/core/host_monitor/Constants.h new file mode 100644 index 0000000000..cdbf3e7dd2 --- /dev/null +++ b/core/host_monitor/Constants.h @@ -0,0 +1,31 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace logtail { + +#ifndef APSARA_UNIT_TEST_MAIN +extern const std::filesystem::path PROCESS_DIR; +#else +extern std::filesystem::path PROCESS_DIR; +#endif + +const extern std::filesystem::path PROCESS_STAT; + +} // namespace logtail diff --git a/core/host_monitor/HostMonitorInputRunner.cpp b/core/host_monitor/HostMonitorInputRunner.cpp new file mode 100755 index 0000000000..926be0151f --- /dev/null +++ b/core/host_monitor/HostMonitorInputRunner.cpp @@ -0,0 +1,158 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "HostMonitorInputRunner.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "HostMonitorTimerEvent.h" +#include "common/Lock.h" +#include "common/timer/Timer.h" +#include "host_monitor/collector/ProcessCollector.h" +#include "logger/Logger.h" +#include "runner/ProcessorRunner.h" + + +namespace logtail { + +HostMonitorInputRunner::HostMonitorInputRunner() : mThreadPool(ThreadPool(3)) { + RegisterCollector(); +} + +void HostMonitorInputRunner::UpdateCollector(const std::string& configName, + const std::vector& newCollectors, + QueueKey processQueueKey, + int inputIndex) { + std::vector oldCollectors; + { + std::unique_lock lock(mCollectorRegisterMapMutex); + auto it = mCollectorRegisterMap.find(configName); + if (it != mCollectorRegisterMap.end()) { + oldCollectors = it->second; + } + mCollectorRegisterMap[configName] = newCollectors; + } + for (const auto& collectorName : newCollectors) { + LOG_INFO(sLogger, ("add new host monitor collector", configName)("collector", collectorName)); + auto collectConfig = std::make_unique( + configName, collectorName, processQueueKey, inputIndex, std::chrono::seconds(DEFAULT_SCHEDULE_INTERVAL)); + // only push event when the collector is new added + if (std::find(oldCollectors.begin(), oldCollectors.end(), collectorName) == oldCollectors.end()) { + Timer::GetInstance()->PushEvent(BuildTimerEvent(std::move(collectConfig))); + } + } +} + +void HostMonitorInputRunner::RemoveCollector(const std::string& configName) { + std::unique_lock lock(mCollectorRegisterMapMutex); + mCollectorRegisterMap.erase(configName); +} + +void HostMonitorInputRunner::Init() { + if (mIsStarted.exchange(true)) { + return; + } + LOG_INFO(sLogger, ("HostMonitorInputRunner", "Start")); +#ifndef APSARA_UNIT_TEST_MAIN + mThreadPool.Start(); +#endif +} + +void HostMonitorInputRunner::Stop() { + if (!mIsStarted.exchange(false)) { + return; + } +#ifndef APSARA_UNIT_TEST_MAIN + std::future result = std::async(std::launch::async, [this]() { mThreadPool.Stop(); }); + if (result.wait_for(std::chrono::seconds(3)) == std::future_status::timeout) { + LOG_ERROR(sLogger, ("HostMonitorInputRunner stop timeout 3 seconds", "may cause thread leak")); + } +#endif + LOG_INFO(sLogger, ("HostMonitorInputRunner", "Stop")); +} + +bool HostMonitorInputRunner::HasRegisteredPlugins() const { + std::shared_lock lock(mCollectorRegisterMapMutex); + return !mCollectorRegisterMap.empty(); +} + +bool HostMonitorInputRunner::IsCollectTaskValid(const std::string& configName, const std::string& collectorName) const { + std::shared_lock lock(mCollectorRegisterMapMutex); + auto collectors = mCollectorRegisterMap.find(configName); + if (collectors == mCollectorRegisterMap.end()) { + return false; + } + for (const auto& collectorName : collectors->second) { + if (collectorName == collectorName) { + return true; + } + } + return false; +} + +void HostMonitorInputRunner::ScheduleOnce(std::unique_ptr collectConfig) { + mThreadPool.Add([this, &collectConfig]() { + PipelineEventGroup group(std::make_shared()); + auto collector = GetCollector(collectConfig->mCollectorName); + if (!collector) { + collector->Collect(group); + } + + bool result = ProcessorRunner::GetInstance()->PushQueue( + collectConfig->mProcessQueueKey, collectConfig->mInputIndex, std::move(group), 3); + if (!result) { + LOG_WARNING(sLogger, + ("push queue failed", "discard data")("config", collectConfig->mConfigName)( + "collector", collectConfig->mCollectorName)); + } + LOG_DEBUG(sLogger, + ("schedule host monitor collector again", collectConfig->mConfigName)("collector", + collectConfig->mCollectorName)); + + auto event = BuildTimerEvent(std::move(collectConfig)); + event->ResetForNextExec(); + Timer::GetInstance()->PushEvent(std::move(event)); + }); +} + +std::unique_ptr +HostMonitorInputRunner::BuildTimerEvent(std::unique_ptr collectConfig) { + auto now = std::chrono::steady_clock::now(); + auto event = std::make_unique(now, std::move(collectConfig)); + return event; +} + +std::shared_ptr HostMonitorInputRunner::GetCollector(const std::string& collectorName) { + auto it = mCollectorInstanceMap.find(collectorName); + if (it == mCollectorInstanceMap.end()) { + return nullptr; + } + return it->second; +} + +template +void HostMonitorInputRunner::RegisterCollector() { + auto collector = std::make_shared(); + mCollectorInstanceMap[collector->GetName()] = collector; +} + +} // namespace logtail diff --git a/core/host_monitor/HostMonitorInputRunner.h b/core/host_monitor/HostMonitorInputRunner.h new file mode 100644 index 0000000000..f83ffc3af9 --- /dev/null +++ b/core/host_monitor/HostMonitorInputRunner.h @@ -0,0 +1,84 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "BaseCollector.h" +#include "InputRunner.h" +#include "Lock.h" +#include "QueueKey.h" +#include "ThreadPool.h" +#include "host_monitor/HostMonitorTimerEvent.h" + +namespace logtail { + +const int DEFAULT_SCHEDULE_INTERVAL = 10; + +class HostMonitorInputRunner : public InputRunner { +public: + HostMonitorInputRunner(const HostMonitorInputRunner&) = delete; + HostMonitorInputRunner(HostMonitorInputRunner&&) = delete; + HostMonitorInputRunner& operator=(const HostMonitorInputRunner&) = delete; + HostMonitorInputRunner& operator=(HostMonitorInputRunner&&) = delete; + ~HostMonitorInputRunner() override = default; + static HostMonitorInputRunner* GetInstance() { + static HostMonitorInputRunner sInstance; + return &sInstance; + } + + void UpdateCollector(const std::string& configName, + const std::vector& collectorNames, + QueueKey processQueueKey, + int inputIndex); + void RemoveCollector(const std::string& configName); + + void Init() override; + void Stop() override; + bool HasRegisteredPlugins() const override; + + bool IsCollectTaskValid(const std::string& configName, const std::string& collectorName) const; + void ScheduleOnce(std::unique_ptr collectConfig); + +private: + HostMonitorInputRunner(); + std::unique_ptr + BuildTimerEvent(std::unique_ptr collectConfig); + + template + void RegisterCollector(); + std::shared_ptr GetCollector(const std::string& collectorName); + + std::atomic_bool mIsStarted = false; + + ThreadPool mThreadPool; + + mutable std::shared_mutex mCollectorRegisterMapMutex; + std::unordered_map> mCollectorRegisterMap; + std::unordered_map> mCollectorInstanceMap; + +#ifdef APSARA_UNIT_TEST_MAIN + friend class HostMonitorInputRunnerUnittest; +#endif +}; + +} // namespace logtail diff --git a/core/host_monitor/HostMonitorTimerEvent.cpp b/core/host_monitor/HostMonitorTimerEvent.cpp new file mode 100644 index 0000000000..3fb5079309 --- /dev/null +++ b/core/host_monitor/HostMonitorTimerEvent.cpp @@ -0,0 +1,35 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "HostMonitorTimerEvent.h" + +#include + +#include "HostMonitorInputRunner.h" + +namespace logtail { + +bool HostMonitorTimerEvent::IsValid() const { + return HostMonitorInputRunner::GetInstance()->IsCollectTaskValid(mCollectConfig->mConfigName, + mCollectConfig->mCollectorName); +} + +bool HostMonitorTimerEvent::Execute() { + HostMonitorInputRunner::GetInstance()->ScheduleOnce(std::move(mCollectConfig)); + return true; +} + +} // namespace logtail diff --git a/core/host_monitor/HostMonitorTimerEvent.h b/core/host_monitor/HostMonitorTimerEvent.h new file mode 100644 index 0000000000..07a3dceb40 --- /dev/null +++ b/core/host_monitor/HostMonitorTimerEvent.h @@ -0,0 +1,62 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "QueueKey.h" +#include "timer/TimerEvent.h" + +namespace logtail { + + +class HostMonitorTimerEvent : public TimerEvent { +public: + struct CollectConfig { + CollectConfig(std::string configName, + std::string collectorName, + QueueKey processQueueKey, + int inputIndex, + std::chrono::seconds interval) + : mConfigName(configName), + mCollectorName(collectorName), + mProcessQueueKey(processQueueKey), + mInputIndex(inputIndex), + mInterval(interval) {} + + std::string mConfigName; + std::string mCollectorName; + QueueKey mProcessQueueKey; + size_t mInputIndex; + std::chrono::seconds mInterval; + }; + + HostMonitorTimerEvent(std::chrono::steady_clock::time_point execTime, std::unique_ptr collectConfig) + : TimerEvent(execTime), mCollectConfig(std::move(collectConfig)) {} + + bool IsValid() const override; + bool Execute() override; + void ResetForNextExec() { SetExecTime(GetExecTime() + mCollectConfig->mInterval); } + +private: + std::unique_ptr mCollectConfig; +}; + +} // namespace logtail diff --git a/core/host_monitor/SystemInformationTools.cpp b/core/host_monitor/SystemInformationTools.cpp new file mode 100644 index 0000000000..edbad31cab --- /dev/null +++ b/core/host_monitor/SystemInformationTools.cpp @@ -0,0 +1,50 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "SystemInformationTools.h" + +#include "Logger.h" +#include "host_monitor/Constants.h" + +namespace logtail { + +int64_t GetSystemBootSeconds() { + static int64_t systemBootSeconds; + if (systemBootSeconds != 0) { + return systemBootSeconds; + } + + std::vector cpuLines = {}; + std::string errorMessage; + int ret = GetFileLines(PROCESS_DIR / PROCESS_STAT, cpuLines, true, &errorMessage); + LOG_WARNING(sLogger, ("failed to get cpu lines", errorMessage)("ret", ret)("cpuLines", cpuLines.size())); + if (ret != 0 || cpuLines.empty()) { + return duration_cast(system_clock::now().time_since_epoch()).count(); + } + + for (auto const& cpuLine : cpuLines) { + auto cpuMetric = SplitString(cpuLine); + if (cpuMetric.size() >= 2 && cpuMetric[0] == "btime") { + constexpr size_t bootTimeIndex = 1; + return bootTimeIndex < cpuMetric.size() ? StringTo(cpuMetric[bootTimeIndex]) : 0; + } + } + + systemBootSeconds = duration_cast(system_clock::now().time_since_epoch()).count(); + return systemBootSeconds; +} + +} // namespace logtail diff --git a/core/host_monitor/SystemInformationTools.h b/core/host_monitor/SystemInformationTools.h new file mode 100644 index 0000000000..c0e1e5189b --- /dev/null +++ b/core/host_monitor/SystemInformationTools.h @@ -0,0 +1,33 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "FileSystemUtil.h" +#include "StringTools.h" +#include "constants/EntityConstants.h" + +using namespace std::chrono; + +namespace logtail { + +int64_t GetSystemBootSeconds(); + +} // namespace logtail diff --git a/core/host_monitor/collector/BaseCollector.h b/core/host_monitor/collector/BaseCollector.h new file mode 100755 index 0000000000..73115e1a53 --- /dev/null +++ b/core/host_monitor/collector/BaseCollector.h @@ -0,0 +1,38 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "PipelineEventGroup.h" + +namespace logtail { + +class BaseCollector { +public: + BaseCollector() = default; + virtual ~BaseCollector() = default; + + bool IsValid() const { return mValidState; } + virtual void Collect(PipelineEventGroup& group) = 0; + + const std::string GetName() const { return mName; } + +protected: + std::string mName; + bool mValidState = false; +}; + +} // namespace logtail diff --git a/core/host_monitor/collector/ProcessCollector.cpp b/core/host_monitor/collector/ProcessCollector.cpp new file mode 100755 index 0000000000..c999c8444c --- /dev/null +++ b/core/host_monitor/collector/ProcessCollector.cpp @@ -0,0 +1,223 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ProcessCollector.h" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "FileSystemUtil.h" +#include "Logger.h" +#include "PipelineEventGroup.h" +#include "StringTools.h" +#include "constants/EntityConstants.h" +#include "host_monitor/SystemInformationTools.h" + +namespace logtail { + +const size_t ProcessTopN = 20; + +void ProcessCollector::Collect(PipelineEventGroup& group) { + group.SetMetadata(EventGroupMetaKey::COLLECT_TIME, std::to_string(time(nullptr))); + std::vector processes; + SortProcessByCpu(processes, ProcessTopN); + for (auto process : processes) { + auto event = group.AddLogEvent(); + time_t logtime = time(nullptr); + event->SetTimestamp(logtime); + + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_PID, std::to_string(process->pid)); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_PPID, std::to_string(process->parentPid)); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_CREATE_TIME, + std::to_string(duration_cast(process->startTime.time_since_epoch()).count())); + } +} + +void ProcessCollector::SortProcessByCpu(std::vector& processStats, size_t topN) { + steady_clock::time_point now = steady_clock::now(); + auto compare = [](const std::pair& a, const std::pair& b) { + return a.second < b.second; + }; + std::priority_queue, + std::vector>, + decltype(compare)> + queue(compare); + + int readCount = 0; + WalkAllProcess(PROCESS_DIR, [&](const std::string& dirName) { + if (++readCount > mProcessSilentCount) { + readCount = 0; + std::this_thread::sleep_for(milliseconds{100}); + } + auto pid = StringTo(dirName); + if (pid != 0) { + bool isFirstCollect = false; + auto ptr = GetProcessStat(pid, isFirstCollect); + if (ptr && !isFirstCollect) { + queue.push(std::make_pair(ptr, ptr->cpuInfo.total)); + } + if (queue.size() > topN) { + queue.pop(); + } + } + }); + + processStats.clear(); + processStats.reserve(queue.size()); + while (!queue.empty()) { + processStats.push_back(queue.top().first); + queue.pop(); + } + std::reverse(processStats.begin(), processStats.end()); + + if (processStats.empty()) { + LOG_INFO(sLogger, ("first collect Process Cpu info", "empty")); + } + LOG_DEBUG(sLogger, ("collect Process Cpu info, top", processStats.size())); + + mProcessSortTime = now; + mSortProcessStats = processStats; +} + +ProcessStatPtr ProcessCollector::GetProcessStat(pid_t pid, bool& isFirstCollect) { + const auto now = steady_clock::now(); + + // TODO: more accurate cache + auto prev = GetPreProcessStat(pid); + isFirstCollect = prev == nullptr || prev->lastTime.time_since_epoch().count() == 0; + // proc/[pid]/stat的统计粒度通常为10ms,两次采样之间需要足够大才能平滑。 + if (prev && now < prev->lastTime + seconds{1}) { + return prev; + } + auto ptr = ReadProcessStat(pid); + if (!ptr) { + return nullptr; + } + + // calculate CPU related fields + { + ptr->lastTime = now; + ptr->cpuInfo.user = ptr->utime.count(); + ptr->cpuInfo.sys = ptr->stime.count(); + ptr->cpuInfo.total = ptr->cpuInfo.user + ptr->cpuInfo.sys; + if (isFirstCollect || ptr->cpuInfo.total <= prev->cpuInfo.total) { + // first time called + ptr->cpuInfo.percent = 0.0; + } else { + auto totalDiff = static_cast(ptr->cpuInfo.total - prev->cpuInfo.total); + auto timeDiff = static_cast(ptr->lastTime.time_since_epoch().count() + - prev->lastTime.time_since_epoch().count()); + ptr->cpuInfo.percent = totalDiff / timeDiff * 100; + } + } + + prev = ptr; + mPrevProcessStat[pid] = ptr; + return ptr; +} + +ProcessStatPtr ProcessCollector::ReadProcessStat(pid_t pid) { + LOG_DEBUG(sLogger, ("read process stat", pid)); + auto processStat = PROCESS_DIR / std::to_string(pid) / PROCESS_STAT; + + std::string line; + if (!ReadFileContent(processStat.string(), line)) { + LOG_ERROR(sLogger, ("read process stat", "fail")("file", processStat)); + return nullptr; + } + return ParseProcessStat(pid, line); +} + + +// 数据样例: /proc/1/stat +// 1 (cat) R 0 1 1 34816 1 4194560 1110 0 0 0 1 1 0 0 20 0 1 0 18938584 4505600 171 18446744073709551615 4194304 4238788 +// 140727020025920 0 0 0 0 0 0 0 0 0 17 3 0 0 0 0 0 6336016 6337300 21442560 140727020027760 140727020027777 +// 140727020027777 140727020027887 0 +ProcessStatPtr ProcessCollector::ParseProcessStat(pid_t pid, std::string& line) { + ProcessStatPtr ptr = std::make_shared(); + ptr->pid = pid; + auto nameStartPos = line.find_first_of('('); + auto nameEndPos = line.find_last_of(')'); + if (nameStartPos == std::string::npos || nameEndPos == std::string::npos) { + LOG_ERROR(sLogger, ("can't find process name", pid)("stat", line)); + return nullptr; + } + nameStartPos++; // 跳过左括号 + ptr->name = line.substr(nameStartPos, nameEndPos - nameStartPos); + line = line.substr(nameEndPos + 2); // 跳过右括号及空格 + + std::vector words = SplitString(line); + + constexpr const EnumProcessStat offset = EnumProcessStat::state; // 跳过pid, comm + constexpr const int minCount = EnumProcessStat::processor - offset + 1; // 37 + if (words.size() < minCount) { + LOG_ERROR(sLogger, ("unexpected item count", pid)("stat", line)); + return nullptr; + } + + ptr->state = words[EnumProcessStat::state - offset].front(); + ptr->parentPid = StringTo(words[EnumProcessStat::ppid - offset]); + ptr->tty = StringTo(words[EnumProcessStat::tty_nr - offset]); + ptr->minorFaults = StringTo(words[EnumProcessStat::minflt - offset]); + ptr->majorFaults = StringTo(words[EnumProcessStat::majflt - offset]); + + ptr->utime = static_cast(StringTo(words[EnumProcessStat::utime - offset])); + ptr->stime = static_cast(StringTo(words[EnumProcessStat::stime - offset])); + ptr->cutime = static_cast(StringTo(words[EnumProcessStat::cutime - offset])); + ptr->cstime = static_cast(StringTo(words[EnumProcessStat::cstime - offset])); + + ptr->priority = StringTo(words[EnumProcessStat::priority - offset]); + ptr->nice = StringTo(words[EnumProcessStat::nice - offset]); + ptr->numThreads = StringTo(words[EnumProcessStat::num_threads - offset]); + + ptr->startTime = system_clock::time_point{ + static_cast(StringTo(words[EnumProcessStat::starttime - offset])) + + milliseconds{GetSystemBootSeconds() * 1000}}; + ptr->vSize = StringTo(words[EnumProcessStat::vsize - offset]); + ptr->rss = StringTo(words[EnumProcessStat::rss - offset]) << (getpagesize()); + ptr->processor = StringTo(words[EnumProcessStat::processor - offset]); + return ptr; +} + +bool ProcessCollector::WalkAllProcess(const std::filesystem::path& root, + const std::function& callback) { + if (!std::filesystem::exists(root) || !std::filesystem::is_directory(root)) { + LOG_ERROR(sLogger, ("ProcessCollector", "root path is not a directory or not exist")("root", root)); + return false; + } + + for (const auto& dirEntry : + std::filesystem::directory_iterator{root, std::filesystem::directory_options::skip_permission_denied}) { + std::string filename = dirEntry.path().filename().string(); + if (IsInt(filename)) { + callback(filename); + } + } + return true; +} + +} // namespace logtail diff --git a/core/host_monitor/collector/ProcessCollector.h b/core/host_monitor/collector/ProcessCollector.h new file mode 100755 index 0000000000..a7fa8c5aa4 --- /dev/null +++ b/core/host_monitor/collector/ProcessCollector.h @@ -0,0 +1,181 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "Flags.h" +#include "Logger.h" +#include "constants/EntityConstants.h" +#include "host_monitor/Constants.h" +#include "host_monitor/collector/BaseCollector.h" + +DECLARE_FLAG_INT32(process_collect_silent_count); + +using namespace std::chrono; + +namespace logtail { + + +struct ProcessCpuInfo { + uint64_t user = 0; + uint64_t sys = 0; + uint64_t total = 0; + double percent = 0.0; +}; + +struct ProcessStat { + pid_t pid = 0; + uint64_t vSize = 0; + uint64_t rss = 0; + uint64_t minorFaults = 0; + uint64_t majorFaults = 0; + pid_t parentPid = 0; + int tty = 0; + int priority = 0; + int nice = 0; + int numThreads = 0; + system_clock::time_point startTime; + steady_clock::time_point lastTime; + + milliseconds utime{0}; + milliseconds stime{0}; + milliseconds cutime{0}; + milliseconds cstime{0}; + ProcessCpuInfo cpuInfo; + + std::string name; + char state = '\0'; + int processor = 0; + + int64_t startMillis() const { return duration_cast(startTime.time_since_epoch()).count(); } +}; + +typedef std::shared_ptr ProcessStatPtr; + +// See https://man7.org/linux/man-pages/man5/proc.5.html +enum class EnumProcessStat : int { + pid, // 0 + comm, // 1 + state, // 2 + ppid, // 3 + pgrp, // 4 + session, // 5 + tty_nr, // 6 + tpgid, // 7 + flags, // 8 + minflt, // 9 + cminflt, // 10 + majflt, // 11 + cmajflt, // 12 + utime, // 13 + stime, // 14 + cutime, // 15 + cstime, // 16 + priority, // 17 + nice, // 18 + num_threads, // 19 + itrealvalue, // 20 + starttime, // 21 + vsize, // 22 + rss, // 23 + rsslim, // 24 + startcode, // 25 + endcode, // 26 + startstack, // 27 + kstkesp, // 28 + kstkeip, // 29 + signal, // 30 + blocked, // 31 + sigignore, // 32 + sigcatch, // 33 + wchan, // 34 + nswap, // 35 + cnswap, // 36 + exit_signal, // 37 + processor, // 38 <--- 至少需要有该字段 + rt_priority, // 39 + policy, // 40 + delayacct_blkio_ticks, // 41 + guest_time, // 42 + cguest_time, // 43 + start_data, // 44 + end_data, // 45 + start_brk, // 46 + arg_start, // 47 + arg_end, // 48 + env_start, // 49 + env_end, // 50 + exit_code, // 51 + + _count, // 只是用于计数,非实际字段 +}; +static_assert((int)EnumProcessStat::comm == 1, "EnumProcessStat invalid"); +static_assert((int)EnumProcessStat::processor == 38, "EnumProcessStat invalid"); + +constexpr int operator-(EnumProcessStat a, EnumProcessStat b) { + return (int)a - (int)b; +} + +class ProcessCollector : public BaseCollector { +public: + ProcessCollector() : mProcessSilentCount(INT32_FLAG(process_collect_silent_count)) { + mName = "process"; + + // try to read process dir + if (access(PROCESS_DIR.c_str(), R_OK) != 0) { + LOG_ERROR(sLogger, ("process collector init failed", "process dir not exist or ")("dir", PROCESS_DIR)); + mValidState = false; + } else { + mValidState = true; + } + }; + ~ProcessCollector() override = default; + + void Collect(PipelineEventGroup& group) override; + +private: + void SortProcessByCpu(std::vector& processStats, size_t topN); + ProcessStatPtr GetProcessStat(pid_t pid, bool& isFirstCollect); + ProcessStatPtr ReadProcessStat(pid_t pid); + ProcessStatPtr ParseProcessStat(pid_t pid, std::string& line); + + bool WalkAllProcess(const std::filesystem::path& root, const std::function& callback); + + ProcessStatPtr GetPreProcessStat(pid_t pid) { return mPrevProcessStat[pid]; } + + steady_clock::time_point mProcessSortTime; + std::vector mSortProcessStats; + std::unordered_map mPrevProcessStat; + + const int mProcessSilentCount; + +#ifdef APSARA_UNIT_TEST_MAIN + friend class ProcessCollectorUnittest; +#endif +}; + +} // namespace logtail diff --git a/core/models/PipelineEventGroup.h b/core/models/PipelineEventGroup.h index 5831a581d1..885f343ba9 100644 --- a/core/models/PipelineEventGroup.h +++ b/core/models/PipelineEventGroup.h @@ -20,8 +20,8 @@ #include #include "checkpoint/RangeCheckpoint.h" -#include "constants/Constants.h" #include "common/memory/SourceBuffer.h" +#include "constants/Constants.h" #include "models/PipelineEventPtr.h" namespace logtail { @@ -58,6 +58,8 @@ enum class EventGroupMetaKey { PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC, PROMETHEUS_UP_STATE, + COLLECT_TIME, + SOURCE_ID }; diff --git a/core/pipeline/Pipeline.h b/core/pipeline/Pipeline.h index d6a55911c7..643efdc506 100644 --- a/core/pipeline/Pipeline.h +++ b/core/pipeline/Pipeline.h @@ -126,6 +126,7 @@ class Pipeline { friend class InputProcessSecurityUnittest; friend class InputNetworkSecurityUnittest; friend class InputNetworkObserverUnittest; + friend class InputHostMetaUnittest; #endif }; diff --git a/core/pipeline/PipelineManager.cpp b/core/pipeline/PipelineManager.cpp index 57abcde874..be6ce21c1b 100644 --- a/core/pipeline/PipelineManager.cpp +++ b/core/pipeline/PipelineManager.cpp @@ -16,6 +16,7 @@ #include "pipeline/PipelineManager.h" +#include "HostMonitorInputRunner.h" #include "file_server/ConfigManager.h" #include "file_server/FileServer.h" #include "go_pipeline/LogtailPlugin.h" @@ -40,7 +41,8 @@ PipelineManager::PipelineManager() : mInputRunners({ PrometheusInputRunner::GetInstance(), #if defined(__linux__) && !defined(__ANDROID__) - ebpf::eBPFServer::GetInstance(), + ebpf::eBPFServer::GetInstance(), + HostMonitorInputRunner::GetInstance(), #endif }) { } @@ -197,6 +199,7 @@ void PipelineManager::StopAllPipelines() { LogtailPlugin::GetInstance()->StopAllPipelines(true); + Timer::GetInstance()->Stop(); ProcessorRunner::GetInstance()->Stop(); FlushAllBatch(); diff --git a/core/pipeline/plugin/PluginRegistry.cpp b/core/pipeline/plugin/PluginRegistry.cpp index bf0d7acbe6..ee6d314fc5 100644 --- a/core/pipeline/plugin/PluginRegistry.cpp +++ b/core/pipeline/plugin/PluginRegistry.cpp @@ -30,7 +30,9 @@ #include "plugin/flusher/sls/FlusherSLS.h" #include "plugin/input/InputContainerStdio.h" #include "plugin/input/InputFile.h" +#include "plugin/input/InputHostMeta.h" #include "plugin/input/InputPrometheus.h" +#include "plugin/processor/inner/ProcessorHostMetaNative.h" #if defined(__linux__) && !defined(__ANDROID__) #include "plugin/input/InputFileSecurity.h" #include "plugin/input/InputInternalMetrics.h" @@ -134,6 +136,7 @@ void PluginRegistry::LoadStaticPlugins() { RegisterInputCreator(new StaticInputCreator()); RegisterInputCreator(new StaticInputCreator()); RegisterInputCreator(new StaticInputCreator()); + RegisterInputCreator(new StaticInputCreator()); #endif RegisterProcessorCreator(new StaticProcessorCreator()); @@ -151,6 +154,7 @@ void PluginRegistry::LoadStaticPlugins() { RegisterProcessorCreator(new StaticProcessorCreator()); RegisterProcessorCreator(new StaticProcessorCreator()); RegisterProcessorCreator(new StaticProcessorCreator()); + RegisterProcessorCreator(new StaticProcessorCreator()); #if defined(__linux__) && !defined(__ANDROID__) && !defined(__EXCLUDE_SPL__) if (BOOL_FLAG(enable_processor_spl)) { RegisterProcessorCreator(new StaticProcessorCreator()); diff --git a/core/pipeline/queue/ProcessQueueManager.h b/core/pipeline/queue/ProcessQueueManager.h index dbe47efeaa..abe0f39c2b 100644 --- a/core/pipeline/queue/ProcessQueueManager.h +++ b/core/pipeline/queue/ProcessQueueManager.h @@ -93,6 +93,7 @@ class ProcessQueueManager : public FeedbackInterface { void Clear(); friend class ProcessQueueManagerUnittest; friend class PipelineUnittest; + friend class HostMonitorInputRunnerUnittest; #endif }; diff --git a/core/plugin/input/InputHostMeta.cpp b/core/plugin/input/InputHostMeta.cpp new file mode 100644 index 0000000000..c62316117e --- /dev/null +++ b/core/plugin/input/InputHostMeta.cpp @@ -0,0 +1,67 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "InputHostMeta.h" + +#include +#include + +#include "HostMonitorInputRunner.h" +#include "Logger.h" +#include "PluginRegistry.h" +#include "ProcessorInstance.h" +#include "constants/EntityConstants.h" +#include "json/value.h" +#include "pipeline/Pipeline.h" +#include "plugin/processor/inner/ProcessorHostMetaNative.h" + +namespace logtail { + +const std::string InputHostMeta::sName = "input_host_meta"; + +bool InputHostMeta::Init(const Json::Value& config, Json::Value& optionalGoPipeline) { + return CreateInnerProcessors(config); +} + +bool InputHostMeta::Start() { + LOG_INFO(sLogger, ("input host meta start", mContext->GetConfigName())); + HostMonitorInputRunner::GetInstance()->Init(); + HostMonitorInputRunner::GetInstance()->UpdateCollector( + mContext->GetConfigName(), {"process"}, mContext->GetProcessQueueKey(), mIndex); + return true; +} + +bool InputHostMeta::Stop(bool isPipelineRemoving) { + LOG_INFO(sLogger, ("input host meta stop", mContext->GetConfigName())); + if (isPipelineRemoving) { + HostMonitorInputRunner::GetInstance()->RemoveCollector(mContext->GetConfigName()); + } + return true; +} + +bool InputHostMeta::CreateInnerProcessors(const Json::Value& config) { + std::unique_ptr processor; + { + processor = PluginRegistry::GetInstance()->CreateProcessor(ProcessorHostMetaNative::sName, + mContext->GetPipeline().GenNextPluginMeta(false)); + Json::Value detail; + if (!processor->Init(detail, *mContext)) { + return false; + } + mInnerProcessors.emplace_back(std::move(processor)); + } + return true; +} + +} // namespace logtail diff --git a/core/plugin/input/InputHostMeta.h b/core/plugin/input/InputHostMeta.h new file mode 100644 index 0000000000..b31a901599 --- /dev/null +++ b/core/plugin/input/InputHostMeta.h @@ -0,0 +1,41 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "pipeline/plugin/interface/Input.h" + +namespace logtail { + +class InputHostMeta : public Input { +public: + static const std::string sName; + + const std::string& Name() const override { return sName; } + bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) override; + bool Start() override; + bool Stop(bool isPipelineRemoving) override; + bool SupportAck() const override { return true; } + +private: + bool CreateInnerProcessors(const Json::Value& config); + +#ifdef APSARA_UNIT_TEST_MAIN + friend class InputHostMetaUnittest; +#endif +}; + +} // namespace logtail diff --git a/core/plugin/processor/inner/ProcessorHostMetaNative.cpp b/core/plugin/processor/inner/ProcessorHostMetaNative.cpp new file mode 100644 index 0000000000..c64016a4d0 --- /dev/null +++ b/core/plugin/processor/inner/ProcessorHostMetaNative.cpp @@ -0,0 +1,115 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ProcessorHostMetaNative.h" + +#include +#include +#include +#include + +#include "Common.h" +#include "LogEvent.h" +#include "Logger.h" +#include "MachineInfoUtil.h" +#include "PipelineEventGroup.h" +#include "StringTools.h" +#include "constants/EntityConstants.h" + +namespace logtail { + +const std::string ProcessorHostMetaNative::sName = "processor_host_meta_native"; + +bool ProcessorHostMetaNative::Init(const Json::Value& config) { + auto hostType = ToString(getenv(DEFAULT_ENV_KEY_HOST_TYPE.c_str())); + std::ostringstream oss; + if (hostType == DEFAULT_ENV_VALUE_ECS) { + oss << DEFAULT_CONTENT_VALUE_DOMAIN_ACS << "." << DEFAULT_ENV_VALUE_ECS << "." + << DEFAULT_CONTENT_VALUE_ENTITY_TYPE_PROCESS; + mDomain = DEFAULT_CONTENT_VALUE_DOMAIN_ACS; + mHostEntityID = FetchHostId(); + } else { + oss << DEFAULT_CONTENT_VALUE_DOMAIN_INFRA << "." << DEFAULT_ENV_VALUE_HOST << "." + << DEFAULT_CONTENT_VALUE_ENTITY_TYPE_PROCESS; + mDomain = DEFAULT_CONTENT_VALUE_DOMAIN_INFRA; + mHostEntityID = GetHostIp(); + } + mEntityType = oss.str(); + return true; +} + +void ProcessorHostMetaNative::Process(PipelineEventGroup& group) { + EventsContainer& events = group.MutableEvents(); + EventsContainer newEvents; + + for (auto& event : events) { + ProcessEvent(group, std::move(event), newEvents); + } + events.swap(newEvents); +} + +bool ProcessorHostMetaNative::IsSupportedEvent(const PipelineEventPtr& event) const { + return event.Is(); +} + +void ProcessorHostMetaNative::ProcessEvent(PipelineEventGroup& group, + PipelineEventPtr&& e, + EventsContainer& newEvents) { + if (!IsSupportedEvent(e)) { + newEvents.emplace_back(std::move(e)); + return; + } + + auto& sourceEvent = e.Cast(); + std::unique_ptr targetEvent = group.CreateLogEvent(true); + targetEvent->SetTimestamp(sourceEvent.GetTimestamp()); + + // TODO: support host entity + targetEvent->SetContent(DEFAULT_CONTENT_KEY_ENTITY_TYPE, mEntityType); + targetEvent->SetContent(DEFAULT_CONTENT_KEY_ENTITY_ID, + GetProcessEntityID(sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_PID), + sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_CREATE_TIME))); + targetEvent->SetContent(DEFAULT_CONTENT_KEY_DOMAIN, mDomain); + targetEvent->SetContent(DEFAULT_CONTENT_KEY_FIRST_OBSERVED_TIME, + sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_CREATE_TIME)); + targetEvent->SetContent(DEFAULT_CONTENT_KEY_LAST_OBSERVED_TIME, group.GetMetadata(EventGroupMetaKey::COLLECT_TIME)); + targetEvent->SetContent(DEFAULT_CONTENT_KEY_KEEP_ALIVE_SECONDS, "30"); + // TODO: support delete event + targetEvent->SetContent(DEFAULT_CONTENT_KEY_METHOD, DEFAULT_CONTENT_VALUE_METHOD_UPDATE); + + targetEvent->SetContent("pid", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_PID)); + targetEvent->SetContent("ppid", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_PPID)); + targetEvent->SetContent("user", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_USER)); + targetEvent->SetContent("comm", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_COMM)); + targetEvent->SetContent("create_time", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_CREATE_TIME)); + targetEvent->SetContent("cwd", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_CWD)); + targetEvent->SetContent("binary", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_BINARY)); + targetEvent->SetContent("arguments", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_ARGUMENTS)); + targetEvent->SetContent("language", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_LANGUAGE)); + targetEvent->SetContent("containerID", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_CONTAINER_ID)); + newEvents.emplace_back(std::move(targetEvent), true, nullptr); +} + + +const std::string ProcessorHostMetaNative::GetProcessEntityID(StringView pid, StringView createTime) { + std::ostringstream oss; + oss << mHostEntityID << pid << createTime; + auto bigID = sdk::CalcMD5(oss.str()); + std::transform(bigID.begin(), bigID.end(), bigID.begin(), ::tolower); + return bigID; +} + +} // namespace logtail diff --git a/core/plugin/processor/inner/ProcessorHostMetaNative.h b/core/plugin/processor/inner/ProcessorHostMetaNative.h new file mode 100644 index 0000000000..3b32e20b06 --- /dev/null +++ b/core/plugin/processor/inner/ProcessorHostMetaNative.h @@ -0,0 +1,48 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "PipelineEventPtr.h" +#include "Processor.h" + +namespace logtail { +class ProcessorHostMetaNative : public Processor { +public: + static const std::string sName; + + const std::string& Name() const override { return sName; } + bool Init(const Json::Value& config) override; + void Process(PipelineEventGroup& group) override; + +protected: + bool IsSupportedEvent(const PipelineEventPtr& event) const override; + +private: + void ProcessEvent(PipelineEventGroup& group, PipelineEventPtr&& e, EventsContainer& newEvents); + + const std::string GetProcessEntityID(StringView pid, StringView createTime); + + std::string mDomain; + std::string mEntityType; + std::string mHostEntityID; + +#ifdef APSARA_UNIT_TEST_MAIN + friend class ProcessorHostMetaNativeUnittest; +#endif +}; + +} // namespace logtail diff --git a/core/runner/sink/http/HttpSink.cpp b/core/runner/sink/http/HttpSink.cpp index b7ef0beac7..2bd2e77cb9 100644 --- a/core/runner/sink/http/HttpSink.cpp +++ b/core/runner/sink/http/HttpSink.cpp @@ -245,17 +245,17 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { long statusCode = 0; curl_easy_getinfo(handler, CURLINFO_RESPONSE_CODE, &statusCode); request->mResponse.SetStatusCode(statusCode); - static_cast(request->mItem->mFlusher)->OnSendDone(request->mResponse, request->mItem); - FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); - mOutSuccessfulItemsTotal->Add(1); - mSuccessfulItemTotalResponseTimeMs->Add(responseTime); - mSendingItemsTotal->Sub(1); LOG_DEBUG( sLogger, ("send http request succeeded, item address", request->mItem)( "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( "response time", ToString(responseTimeMs) + "ms")("try cnt", ToString(request->mTryCnt))( "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); + static_cast(request->mItem->mFlusher)->OnSendDone(request->mResponse, request->mItem); + FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); + mOutSuccessfulItemsTotal->Add(1); + mSuccessfulItemTotalResponseTimeMs->Add(responseTime); + mSendingItemsTotal->Sub(1); break; } default: @@ -277,9 +277,6 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { ++runningHandlers; requestReused = true; } else { - static_cast(request->mItem->mFlusher) - ->OnSendDone(request->mResponse, request->mItem); - FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); LOG_DEBUG(sLogger, ("failed to send http request", "abort")("item address", request->mItem)( "config-flusher-dst", @@ -287,6 +284,9 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { "response time", ToString(responseTimeMs) + "ms")("try cnt", ToString(request->mTryCnt))( "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); + static_cast(request->mItem->mFlusher) + ->OnSendDone(request->mResponse, request->mItem); + FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); } mOutFailedItemsTotal->Add(1); mFailedItemTotalResponseTimeMs->Add(responseTime); diff --git a/core/unittest/CMakeLists.txt b/core/unittest/CMakeLists.txt index 41f1601069..f77bb6998e 100644 --- a/core/unittest/CMakeLists.txt +++ b/core/unittest/CMakeLists.txt @@ -58,6 +58,7 @@ macro(add_core_subdir) add_subdirectory(prometheus) add_subdirectory(route) add_subdirectory(task_pipeline) + add_subdirectory(host_monitor) endmacro() macro(add_spl_subdir) diff --git a/core/unittest/host_monitor/1/stat b/core/unittest/host_monitor/1/stat new file mode 100644 index 0000000000..cbda9f715f --- /dev/null +++ b/core/unittest/host_monitor/1/stat @@ -0,0 +1 @@ +1 (cat) R 0 1 1 34816 1 4194560 1110 0 0 0 1 1 0 0 20 0 1 0 18938584 4505600 171 18446744073709551615 4194304 4238788 140727020025920 0 0 0 0 0 0 0 0 0 17 3 0 0 0 0 0 6336016 6337300 21442560 140727020027760 140727020027777 140727020027777 140727020027887 0 \ No newline at end of file diff --git a/core/unittest/host_monitor/CMakeLists.txt b/core/unittest/host_monitor/CMakeLists.txt new file mode 100644 index 0000000000..fd761c8c74 --- /dev/null +++ b/core/unittest/host_monitor/CMakeLists.txt @@ -0,0 +1,34 @@ +# Copyright 2023 iLogtail Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.22) +project(host_monitor_unittest) + +add_executable(process_collector_unittest ProcessCollectorUnittest.cpp) +target_link_libraries(process_collector_unittest ${UT_BASE_TARGET}) + +add_executable(host_monitor_input_runner_unittest HostMonitorInputRunnerUnittest.cpp) +target_link_libraries(host_monitor_input_runner_unittest ${UT_BASE_TARGET}) + +add_executable(system_information_tools_unittest SystemInformationToolsUnittest.cpp) +target_link_libraries(system_information_tools_unittest ${UT_BASE_TARGET}) + +file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/1) +file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/1/ DESTINATION ${CMAKE_CURRENT_BINARY_DIR}/1/) +file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/stat DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) + +include(GoogleTest) +gtest_discover_tests(process_collector_unittest) +gtest_discover_tests(host_monitor_input_runner_unittest) +gtest_discover_tests(system_information_tools_unittest) diff --git a/core/unittest/host_monitor/HostMonitorInputRunnerUnittest.cpp b/core/unittest/host_monitor/HostMonitorInputRunnerUnittest.cpp new file mode 100644 index 0000000000..b2345bc113 --- /dev/null +++ b/core/unittest/host_monitor/HostMonitorInputRunnerUnittest.cpp @@ -0,0 +1,80 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "HostMonitorInputRunner.h" +#include "HostMonitorTimerEvent.h" +#include "ProcessQueueItem.h" +#include "ProcessQueueManager.h" +#include "QueueKey.h" +#include "QueueKeyManager.h" +#include "common/timer/Timer.h" +#include "unittest/Unittest.h" + +using namespace std; + +namespace logtail { + +class HostMonitorInputRunnerUnittest : public testing::Test { +public: + void TestUpdateAndRemoveCollector() const; + void TestScheduleOnce() const; +}; + +void HostMonitorInputRunnerUnittest::TestUpdateAndRemoveCollector() const { + auto runner = HostMonitorInputRunner::GetInstance(); + runner->Init(); + runner->UpdateCollector("test", {"mock"}, QueueKey{}, 0); + APSARA_TEST_TRUE_FATAL(runner->IsCollectTaskValid("test", "mock")); + APSARA_TEST_TRUE_FATAL(runner->HasRegisteredPlugins()); + runner->RemoveCollector("test"); + APSARA_TEST_FALSE_FATAL(runner->IsCollectTaskValid("test", "mock")); + APSARA_TEST_FALSE_FATAL(runner->HasRegisteredPlugins()); + runner->Stop(); +} + +void HostMonitorInputRunnerUnittest::TestScheduleOnce() const { + auto runner = HostMonitorInputRunner::GetInstance(); + runner->Init(); + runner->mThreadPool.Start(); + std::string configName = "test"; + auto queueKey = QueueKeyManager::GetInstance()->GetKey(configName); + auto ctx = PipelineContext(); + ctx.SetConfigName(configName); + ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(queueKey, 0, ctx); + + auto collectConfig = std::make_unique( + configName, "process", queueKey, 0, std::chrono::seconds(1)); + runner->ScheduleOnce(std::move(collectConfig)); + std::this_thread::sleep_for(std::chrono::seconds(1)); + auto item = std::unique_ptr(new ProcessQueueItem(std::make_shared(), 0)); + ProcessQueueManager::GetInstance()->EnablePop(configName); + APSARA_TEST_TRUE_FATAL(ProcessQueueManager::GetInstance()->PopItem(0, item, configName)); + APSARA_TEST_EQUAL_FATAL("test", configName); + APSARA_TEST_TRUE_FATAL(item->mEventGroup.GetEvents().size() == 1); + + // verify schdule next + APSARA_TEST_EQUAL_FATAL(Timer::GetInstance()->mQueue.size(), 1); + runner->mThreadPool.Stop(); + runner->Stop(); +} + +UNIT_TEST_CASE(HostMonitorInputRunnerUnittest, TestUpdateAndRemoveCollector); +UNIT_TEST_CASE(HostMonitorInputRunnerUnittest, TestScheduleOnce); + +} // namespace logtail + +UNIT_TEST_MAIN diff --git a/core/unittest/host_monitor/ProcessCollectorUnittest.cpp b/core/unittest/host_monitor/ProcessCollectorUnittest.cpp new file mode 100644 index 0000000000..8e4c8c327c --- /dev/null +++ b/core/unittest/host_monitor/ProcessCollectorUnittest.cpp @@ -0,0 +1,58 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ProcessCollector.h" +#include "host_monitor/Constants.h" +#include "unittest/Unittest.h" + +using namespace std; + +namespace logtail { + +class ProcessCollectorUnittest : public testing::Test { +public: + void TestReadProcessStat() const; + void TestSortProcessByCpu() const; +}; + +void ProcessCollectorUnittest::TestReadProcessStat() const { + PROCESS_DIR = "."; + auto collector = ProcessCollector(); + auto ptr = collector.ReadProcessStat(1); + APSARA_TEST_NOT_EQUAL(nullptr, ptr); + APSARA_TEST_EQUAL(1, ptr->pid); + APSARA_TEST_EQUAL("cat", ptr->name); +} + +void ProcessCollectorUnittest::TestSortProcessByCpu() const { + PROCESS_DIR = "/proc"; + auto collector = ProcessCollector(); + auto processes = vector(); + collector.SortProcessByCpu(processes, 5); // fist time will be ignored + collector.SortProcessByCpu(processes, 5); + APSARA_TEST_EQUAL(5, processes.size()); + auto prev = processes[0]; + for (auto i = 1; i < processes.size(); i++) { + auto process = processes[i]; + APSARA_TEST_TRUE(process->cpuInfo.percent <= prev->cpuInfo.percent); + prev = process; + } +} + +UNIT_TEST_CASE(ProcessCollectorUnittest, TestReadProcessStat); +UNIT_TEST_CASE(ProcessCollectorUnittest, TestSortProcessByCpu); + +} // namespace logtail + +UNIT_TEST_MAIN diff --git a/core/unittest/host_monitor/SystemInformationToolsUnittest.cpp b/core/unittest/host_monitor/SystemInformationToolsUnittest.cpp new file mode 100644 index 0000000000..5da4e004d8 --- /dev/null +++ b/core/unittest/host_monitor/SystemInformationToolsUnittest.cpp @@ -0,0 +1,38 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "SystemInformationTools.h" +#include "host_monitor/Constants.h" +#include "unittest/Unittest.h" + +using namespace std; + +namespace logtail { + +class SystemInformationToolsUnittest : public testing::Test { +public: + void TestGetSystemBootSeconds() const; +}; + +void SystemInformationToolsUnittest::TestGetSystemBootSeconds() const { + PROCESS_DIR = "."; + APSARA_TEST_EQUAL(1731142542, GetSystemBootSeconds()); +} + + +UNIT_TEST_CASE(SystemInformationToolsUnittest, TestGetSystemBootSeconds); + +} // namespace logtail + +UNIT_TEST_MAIN diff --git a/core/unittest/host_monitor/stat b/core/unittest/host_monitor/stat new file mode 100644 index 0000000000..2499f9d324 --- /dev/null +++ b/core/unittest/host_monitor/stat @@ -0,0 +1 @@ +btime 1731142542 \ No newline at end of file diff --git a/core/unittest/input/CMakeLists.txt b/core/unittest/input/CMakeLists.txt index 49129244cf..3e18f939f5 100644 --- a/core/unittest/input/CMakeLists.txt +++ b/core/unittest/input/CMakeLists.txt @@ -36,6 +36,9 @@ target_link_libraries(input_ebpf_network_security_unittest unittest_base) add_executable(input_ebpf_network_observer_unittest InputNetworkObserverUnittest.cpp) target_link_libraries(input_ebpf_network_observer_unittest unittest_base) +add_executable(input_host_meat_unittest InputHostMetaUnittest.cpp) +target_link_libraries(input_host_meat_unittest unittest_base) + include(GoogleTest) gtest_discover_tests(input_file_unittest) gtest_discover_tests(input_container_stdio_unittest) @@ -44,3 +47,4 @@ gtest_discover_tests(input_ebpf_file_security_unittest) gtest_discover_tests(input_ebpf_process_security_unittest) gtest_discover_tests(input_ebpf_network_security_unittest) gtest_discover_tests(input_ebpf_network_observer_unittest) +gtest_discover_tests(input_host_meat_unittest) diff --git a/core/unittest/input/InputHostMetaUnittest.cpp b/core/unittest/input/InputHostMetaUnittest.cpp new file mode 100644 index 0000000000..9c284f945a --- /dev/null +++ b/core/unittest/input/InputHostMetaUnittest.cpp @@ -0,0 +1,131 @@ +// Copyright 2023 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "PluginRegistry.h" +#include "common/JsonUtil.h" +#include "ebpf/config.h" +#include "pipeline/Pipeline.h" +#include "plugin/input/InputHostMeta.h" +#include "unittest/Unittest.h" + +using namespace std; + +namespace logtail { + +class InputHostMetaUnittest : public testing::Test { +public: + void TestName(); + void TestSupportAck(); + void OnSuccessfulInit(); + void OnFailedInit(); + void OnSuccessfulStart(); + void OnSuccessfulStop(); + // void OnPipelineUpdate(); + +protected: + void SetUp() override { + p.mName = "test_config"; + ctx.SetConfigName("test_config"); + ctx.SetPipeline(p); + PluginRegistry::GetInstance()->LoadPlugins(); + } + +private: + Pipeline p; + PipelineContext ctx; +}; + +void InputHostMetaUnittest::TestName() { + InputHostMeta input; + std::string name = input.Name(); + APSARA_TEST_EQUAL(name, "input_host_meta"); +} + +void InputHostMetaUnittest::TestSupportAck() { + InputHostMeta input; + bool supportAck = input.SupportAck(); + APSARA_TEST_FALSE(supportAck); +} + +void InputHostMetaUnittest::OnSuccessfulInit() { + unique_ptr input; + Json::Value configJson, optionalGoPipeline; + string configStr, errorMsg; + + // valid optional param + configStr = R"( + { + "Type": "input_host_meta" + } + )"; + APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); + input.reset(new InputHostMeta()); + input->SetContext(ctx); + input->SetMetricsRecordRef("test", "1"); + APSARA_TEST_TRUE(input->Init(configJson, optionalGoPipeline)); + APSARA_TEST_EQUAL(input->sName, "input_host_meta"); +} + +void InputHostMetaUnittest::OnFailedInit() { +} + +void InputHostMetaUnittest::OnSuccessfulStart() { + unique_ptr input; + Json::Value configJson, optionalGoPipeline; + string configStr, errorMsg; + + configStr = R"( + { + "Type": "input_host_meta" + } + )"; + APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); + input.reset(new InputHostMeta()); + input->SetContext(ctx); + input->SetMetricsRecordRef("test", "1"); + APSARA_TEST_TRUE(input->Init(configJson, optionalGoPipeline)); + APSARA_TEST_TRUE(input->Start()); +} + +void InputHostMetaUnittest::OnSuccessfulStop() { + unique_ptr input; + Json::Value configJson, optionalGoPipeline; + string configStr, errorMsg; + + configStr = R"( + { + "Type": "input_host_meta" + } + )"; + APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); + input.reset(new InputHostMeta()); + input->SetContext(ctx); + input->SetMetricsRecordRef("test", "1"); + APSARA_TEST_TRUE(input->Init(configJson, optionalGoPipeline)); + APSARA_TEST_TRUE(input->Start()); + APSARA_TEST_TRUE(input->Stop(false)); +} + +UNIT_TEST_CASE(InputHostMetaUnittest, TestName) +UNIT_TEST_CASE(InputHostMetaUnittest, TestSupportAck) +UNIT_TEST_CASE(InputHostMetaUnittest, OnSuccessfulInit) +UNIT_TEST_CASE(InputHostMetaUnittest, OnFailedInit) +UNIT_TEST_CASE(InputHostMetaUnittest, OnSuccessfulStart) +UNIT_TEST_CASE(InputHostMetaUnittest, OnSuccessfulStop) + +} // namespace logtail + +UNIT_TEST_MAIN diff --git a/core/unittest/processor/CMakeLists.txt b/core/unittest/processor/CMakeLists.txt index 39b5721186..3b4d3e0fcf 100644 --- a/core/unittest/processor/CMakeLists.txt +++ b/core/unittest/processor/CMakeLists.txt @@ -57,6 +57,9 @@ target_link_libraries(processor_parse_container_log_native_unittest ${UT_BASE_TA add_executable(processor_prom_parse_metric_native_unittest ProcessorPromParseMetricNativeUnittest.cpp) target_link_libraries(processor_prom_parse_metric_native_unittest unittest_base) +add_executable(processor_host_meta_native_unittest ProcessorHostMetaNativeUnittest.cpp) +target_link_libraries(processor_host_meta_native_unittest unittest_base) + include(GoogleTest) gtest_discover_tests(processor_split_log_string_native_unittest) gtest_discover_tests(processor_split_multiline_log_string_native_unittest) @@ -72,6 +75,7 @@ gtest_discover_tests(processor_desensitize_native_unittest) gtest_discover_tests(processor_merge_multiline_log_native_unittest) gtest_discover_tests(processor_parse_container_log_native_unittest) gtest_discover_tests(processor_prom_parse_metric_native_unittest) +gtest_discover_tests(processor_host_meta_native_unittest) add_executable(boost_regex_benchmark BoostRegexBenchmark.cpp) target_link_libraries(boost_regex_benchmark ${UT_BASE_TARGET}) diff --git a/core/unittest/processor/ProcessorHostMetaNativeUnittest.cpp b/core/unittest/processor/ProcessorHostMetaNativeUnittest.cpp new file mode 100644 index 0000000000..1d00798a32 --- /dev/null +++ b/core/unittest/processor/ProcessorHostMetaNativeUnittest.cpp @@ -0,0 +1,120 @@ +// Copyright 2023 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "LogEvent.h" +#include "constants/EntityConstants.h" +#include "pipeline/Pipeline.h" +#include "plugin/processor/inner/ProcessorHostMetaNative.h" +#include "unittest/Unittest.h" + +namespace logtail { + +class ProcessorHostMetaNativeUnittest : public ::testing::Test { +public: + void TestInit(); + void TestProcess(); + void TestGetProcessEntityID(); + +protected: + void SetUp() override { mContext.SetConfigName("project##config_0"); } + +private: + PipelineContext mContext; +}; + +void ProcessorHostMetaNativeUnittest::TestInit() { + // make config + Json::Value config; + Pipeline pipeline; + mContext.SetPipeline(pipeline); + + { + setenv(DEFAULT_ENV_KEY_HOST_TYPE.c_str(), DEFAULT_ENV_VALUE_ECS.c_str(), 1); + ProcessorHostMetaNative processor; + processor.SetContext(mContext); + APSARA_TEST_TRUE_FATAL(processor.Init(config)); + APSARA_TEST_EQUAL_FATAL(processor.mDomain, DEFAULT_CONTENT_VALUE_DOMAIN_ACS); + APSARA_TEST_EQUAL_FATAL(processor.mEntityType, + DEFAULT_CONTENT_VALUE_DOMAIN_ACS + "." + DEFAULT_ENV_VALUE_ECS + "." + + DEFAULT_CONTENT_VALUE_ENTITY_TYPE_PROCESS); + } + { + setenv(DEFAULT_ENV_KEY_HOST_TYPE.c_str(), DEFAULT_CONTENT_VALUE_DOMAIN_INFRA.c_str(), 1); + ProcessorHostMetaNative processor; + processor.SetContext(mContext); + APSARA_TEST_TRUE_FATAL(processor.Init(config)); + APSARA_TEST_EQUAL_FATAL(processor.mDomain, DEFAULT_CONTENT_VALUE_DOMAIN_INFRA); + APSARA_TEST_EQUAL_FATAL(processor.mEntityType, + DEFAULT_CONTENT_VALUE_DOMAIN_INFRA + "." + DEFAULT_ENV_VALUE_HOST + "." + + DEFAULT_CONTENT_VALUE_ENTITY_TYPE_PROCESS); + } +} + +void ProcessorHostMetaNativeUnittest::TestProcess() { + // make config + Json::Value config; + auto sourceBuffer = std::make_shared(); + PipelineEventGroup eventGroup(sourceBuffer); + eventGroup.SetMetadataNoCopy(EventGroupMetaKey::COLLECT_TIME, "123456"); + auto event = eventGroup.AddLogEvent(); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_PID, "123"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_CREATE_TIME, "123456"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_PPID, "123"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_USER, "root"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_COMM, "comm"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_CWD, "cwd"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_BINARY, "binary"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_ARGUMENTS, "arguments"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_LANGUAGE, "language"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_CONTAINER_ID, "container_id"); + + ProcessorHostMetaNative processor; + processor.SetContext(mContext); + APSARA_TEST_TRUE_FATAL(processor.Init(config)); + processor.Process(eventGroup); + APSARA_TEST_EQUAL_FATAL(eventGroup.GetEvents().size(), 1); + auto newEvent = eventGroup.GetEvents().front().Cast(); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent(DEFAULT_CONTENT_KEY_DOMAIN), processor.mDomain); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent(DEFAULT_CONTENT_KEY_ENTITY_TYPE), processor.mEntityType); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent(DEFAULT_CONTENT_KEY_FIRST_OBSERVED_TIME), "123456"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent(DEFAULT_CONTENT_KEY_KEEP_ALIVE_SECONDS), "30"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent(DEFAULT_CONTENT_KEY_METHOD), DEFAULT_CONTENT_VALUE_METHOD_UPDATE); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("pid"), "123"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("ppid"), "123"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("user"), "root"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("comm"), "comm"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("create_time"), "123456"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("cwd"), "cwd"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("binary"), "binary"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("arguments"), "arguments"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("language"), "language"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("containerID"), "container_id"); +} + +void ProcessorHostMetaNativeUnittest::TestGetProcessEntityID() { + ProcessorHostMetaNative processor; + processor.Init(Json::Value()); + processor.mHostEntityID = "123"; + APSARA_TEST_EQUAL(processor.GetProcessEntityID("123", "123"), "f5bb0c8de146c67b44babbf4e6584cc0"); +} + +UNIT_TEST_CASE(ProcessorHostMetaNativeUnittest, TestInit) +UNIT_TEST_CASE(ProcessorHostMetaNativeUnittest, TestProcess) +UNIT_TEST_CASE(ProcessorHostMetaNativeUnittest, TestGetProcessEntityID) + +} // namespace logtail + +UNIT_TEST_MAIN