From 3e12438c1fb500f11609f0d9af920c14e268cd87 Mon Sep 17 00:00:00 2001 From: abingcbc Date: Mon, 18 Nov 2024 10:00:51 +0800 Subject: [PATCH 1/2] feat: support host monitor --- core/CMakeLists.txt | 1 + core/app_config/AppConfig.cpp | 1 + core/common/FileSystemUtil.cpp | 44 ++++ core/common/FileSystemUtil.h | 17 ++ core/common/StringTools.cpp | 8 + core/common/StringTools.h | 6 + core/common/common.cmake | 2 +- core/common/timer/HostMonitorTimerEvent.cpp | 32 +++ core/common/timer/HostMonitorTimerEvent.h | 60 +++++ core/common/timer/Timer.h | 1 + core/common/timer/TimerEvent.h | 1 + core/constants/EntityConstants.cpp | 48 ++++ core/constants/EntityConstants.h | 48 ++++ core/host_monitor/Constants.cpp | 30 +++ core/host_monitor/Constants.h | 33 +++ core/host_monitor/HostMonitorInputRunner.cpp | 145 ++++++++++++ core/host_monitor/HostMonitorInputRunner.h | 77 ++++++ core/host_monitor/SystemInformationTools.cpp | 50 ++++ core/host_monitor/SystemInformationTools.h | 33 +++ core/host_monitor/collector/BaseCollector.h | 38 +++ .../collector/CollectorManager.cpp | 44 ++++ .../host_monitor/collector/CollectorManager.h | 44 ++++ core/host_monitor/collector/MockCollector.cpp | 33 +++ core/host_monitor/collector/MockCollector.h | 36 +++ .../collector/ProcessCollector.cpp | 223 ++++++++++++++++++ .../host_monitor/collector/ProcessCollector.h | 186 +++++++++++++++ core/models/PipelineEventGroup.h | 4 +- core/pipeline/Pipeline.h | 1 + core/pipeline/PipelineManager.cpp | 4 +- core/pipeline/plugin/PluginRegistry.cpp | 4 + core/pipeline/queue/ProcessQueueManager.h | 1 + core/plugin/input/InputHostMeta.cpp | 65 +++++ core/plugin/input/InputHostMeta.h | 46 ++++ .../inner/ProcessorHostMetaNative.cpp | 117 +++++++++ .../processor/inner/ProcessorHostMetaNative.h | 48 ++++ core/runner/sink/http/HttpSink.cpp | 16 +- core/unittest/CMakeLists.txt | 1 + core/unittest/host_monitor/1/stat | 1 + core/unittest/host_monitor/CMakeLists.txt | 38 +++ .../host_monitor/CollectorManagerUnittest.cpp | 42 ++++ .../HostMonitorInputRunnerUnittest.cpp | 78 ++++++ .../host_monitor/ProcessCollectorUnittest.cpp | 58 +++++ .../SystemInformationToolsUnittest.cpp | 38 +++ core/unittest/host_monitor/stat | 1 + core/unittest/input/CMakeLists.txt | 4 + core/unittest/input/InputHostMetaUnittest.cpp | 131 ++++++++++ core/unittest/processor/CMakeLists.txt | 4 + .../ProcessorHostMetaNativeUnittest.cpp | 120 ++++++++++ 48 files changed, 2052 insertions(+), 11 deletions(-) create mode 100644 core/common/timer/HostMonitorTimerEvent.cpp create mode 100644 core/common/timer/HostMonitorTimerEvent.h create mode 100644 core/constants/EntityConstants.cpp create mode 100644 core/constants/EntityConstants.h create mode 100644 core/host_monitor/Constants.cpp create mode 100644 core/host_monitor/Constants.h create mode 100644 core/host_monitor/HostMonitorInputRunner.cpp create mode 100644 core/host_monitor/HostMonitorInputRunner.h create mode 100644 core/host_monitor/SystemInformationTools.cpp create mode 100644 core/host_monitor/SystemInformationTools.h create mode 100644 core/host_monitor/collector/BaseCollector.h create mode 100644 core/host_monitor/collector/CollectorManager.cpp create mode 100644 core/host_monitor/collector/CollectorManager.h create mode 100644 core/host_monitor/collector/MockCollector.cpp create mode 100644 core/host_monitor/collector/MockCollector.h create mode 100644 core/host_monitor/collector/ProcessCollector.cpp create mode 100644 core/host_monitor/collector/ProcessCollector.h create mode 100644 core/plugin/input/InputHostMeta.cpp create mode 100644 core/plugin/input/InputHostMeta.h create mode 100644 core/plugin/processor/inner/ProcessorHostMetaNative.cpp create mode 100644 core/plugin/processor/inner/ProcessorHostMetaNative.h create mode 100644 core/unittest/host_monitor/1/stat create mode 100644 core/unittest/host_monitor/CMakeLists.txt create mode 100644 core/unittest/host_monitor/CollectorManagerUnittest.cpp create mode 100644 core/unittest/host_monitor/HostMonitorInputRunnerUnittest.cpp create mode 100644 core/unittest/host_monitor/ProcessCollectorUnittest.cpp create mode 100644 core/unittest/host_monitor/SystemInformationToolsUnittest.cpp create mode 100644 core/unittest/host_monitor/stat create mode 100644 core/unittest/input/InputHostMetaUnittest.cpp create mode 100644 core/unittest/processor/ProcessorHostMetaNativeUnittest.cpp diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index b1b91aaf73..4a8ca74921 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -124,6 +124,7 @@ set(SUB_DIRECTORIES_LIST prometheus prometheus/labels prometheus/schedulers prometheus/async ebpf ebpf/observer ebpf/security ebpf/handler parser sls_control sdk + host_monitor host_monitor/collector ) if (LINUX) if (ENABLE_ENTERPRISE) diff --git a/core/app_config/AppConfig.cpp b/core/app_config/AppConfig.cpp index d492af6b7b..a660601346 100644 --- a/core/app_config/AppConfig.cpp +++ b/core/app_config/AppConfig.cpp @@ -164,6 +164,7 @@ DEFINE_FLAG_STRING(loong_collector_operator_service, "loong collector operator s DEFINE_FLAG_INT32(loong_collector_operator_service_port, "loong collector operator service port", 8888); DEFINE_FLAG_INT32(loong_collector_k8s_meta_service_port, "loong collector operator service port", 9000); DEFINE_FLAG_STRING(_pod_name_, "agent pod name", ""); +DEFINE_FLAG_INT32(process_collect_silent_count, "number of process scanned between a sleep", 1000); DEFINE_FLAG_STRING(app_info_file, "", "app_info.json"); DEFINE_FLAG_STRING(crash_stack_file_name, "crash stack back trace file name", "backtrace.dat"); diff --git a/core/common/FileSystemUtil.cpp b/core/common/FileSystemUtil.cpp index 39ef57c067..80d7e24fd0 100644 --- a/core/common/FileSystemUtil.cpp +++ b/core/common/FileSystemUtil.cpp @@ -141,6 +141,50 @@ bool ReadFileContent(const std::string& fileName, std::string& content, uint32_t return true; } +int GetLines(std::istream& is, + bool enableEmptyLine, + const std::function& pushBack, + std::string* errorMessage) { + std::string line; + // 此处必须判断eof,具体原因参见: + // https://stackoverflow.com/questions/40561482/getline-throws-basic-iosclear-exception-after-reading-the-last-line + while (!is.eof() && std::getline(is, line)) { + if (enableEmptyLine || !line.empty()) { + pushBack(line); + } + } + return 0; +} + +int GetLines(const bfs::path& filename, + bool enableEmptyLine, + const std::function& pushBack, + std::string* errorMessage) { + int ret = 0; + std::ifstream fin; + try { + fin.exceptions(std::ifstream::failbit | std::ifstream::badbit); + fin.open(filename.string(), std::ios_base::in); + fin.exceptions(std::ifstream::goodbit); + GetLines(fin, enableEmptyLine, pushBack, errorMessage); + fin.close(); + } catch (const std::exception& fail) { + if (errorMessage != nullptr) { + LOG_ERROR(sLogger, ("open file fail", filename)("errno", strerror(errno))); + ret = -1; + } + fin.close(); + } + return ret; +} + +int GetFileLines(const bfs::path& filename, + std::vector& res, + bool enableEmptyLine, + std::string* errorMessage) { + return GetLines(filename, enableEmptyLine, [&res](const std::string& s) { res.push_back(s); }, errorMessage); +} + bool OverwriteFile(const std::string& fileName, const std::string& content) { FILE* pFile = fopen(fileName.c_str(), "w"); if (pFile == NULL) { diff --git a/core/common/FileSystemUtil.h b/core/common/FileSystemUtil.h index 3ad1b046ed..8e0f523600 100644 --- a/core/common/FileSystemUtil.h +++ b/core/common/FileSystemUtil.h @@ -26,10 +26,14 @@ #elif defined(_MSC_VER) #include #endif +#include + #include "DevInode.h" #include "ErrorUtil.h" #include "LogtailCommonFlags.h" +namespace bfs = boost::filesystem; + // Filesystem utility. namespace logtail { @@ -87,6 +91,19 @@ void TrimLastSeperator(std::string& path); // ReadFileContent reads all content of @fileName to @content. bool ReadFileContent(const std::string& fileName, std::string& content, uint32_t maxFileSize = 8192); +int GetLines(std::istream& is, + bool enableEmptyLine, + const std::function& pushBack, + std::string* errorMessage); +int GetLines(const bfs::path& filename, + bool enableEmptyLine, + const std::function& pushBack, + std::string* errorMessage); +int GetFileLines(const bfs::path& filename, + std::vector& res, + bool enableEmptyLine = true, + std::string* errorMessage = nullptr); + // OverwriteFile overwrides @fileName with @content. bool OverwriteFile(const std::string& fileName, const std::string& content); diff --git a/core/common/StringTools.cpp b/core/common/StringTools.cpp index 0e800cb754..57b29cfde9 100644 --- a/core/common/StringTools.cpp +++ b/core/common/StringTools.cpp @@ -363,4 +363,12 @@ void RemoveFilePathTrailingSlash(std::string& filePath) { filePath = path.string(); } +bool IsInt(const char* sz) { + bool ok = (sz != nullptr && *sz != '\0'); + for (auto* it = reinterpret_cast(sz); ok && *it; ++it) { + ok = (0 != std::isdigit(*it)); + } + return ok; +} + } // namespace logtail diff --git a/core/common/StringTools.h b/core/common/StringTools.h index 4d519dc8fa..da20c45e73 100644 --- a/core/common/StringTools.h +++ b/core/common/StringTools.h @@ -146,6 +146,12 @@ bool NormalizeTopicRegFormat(std::string& regStr); void RemoveFilePathTrailingSlash(std::string& path); +bool IsInt(const char* sz); + +inline bool IsInt(const std::string& str) { + return IsInt(str.c_str()); +} + #if defined(_MSC_VER) // TODO: Test it. #define FNM_PATHNAME 0 diff --git a/core/common/common.cmake b/core/common/common.cmake index cd5e9401c6..94d715bf8d 100644 --- a/core/common/common.cmake +++ b/core/common/common.cmake @@ -29,7 +29,7 @@ list(APPEND THIS_SOURCE_FILES_LIST ${XX_HASH_SOURCE_FILES}) # add memory in common list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/memory/SourceBuffer.h) list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/http/AsynCurlRunner.cpp ${CMAKE_SOURCE_DIR}/common/http/Curl.cpp ${CMAKE_SOURCE_DIR}/common/http/HttpResponse.cpp ${CMAKE_SOURCE_DIR}/common/http/HttpRequest.cpp) -list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/timer/Timer.cpp ${CMAKE_SOURCE_DIR}/common/timer/HttpRequestTimerEvent.cpp) +list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/timer/Timer.cpp ${CMAKE_SOURCE_DIR}/common/timer/HttpRequestTimerEvent.cpp ${CMAKE_SOURCE_DIR}/common/timer/HostMonitorTimerEvent.cpp) list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/compression/Compressor.cpp ${CMAKE_SOURCE_DIR}/common/compression/CompressorFactory.cpp ${CMAKE_SOURCE_DIR}/common/compression/LZ4Compressor.cpp ${CMAKE_SOURCE_DIR}/common/compression/ZstdCompressor.cpp) # remove several files in common list(REMOVE_ITEM THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/BoostRegexValidator.cpp ${CMAKE_SOURCE_DIR}/common/GetUUID.cpp) diff --git a/core/common/timer/HostMonitorTimerEvent.cpp b/core/common/timer/HostMonitorTimerEvent.cpp new file mode 100644 index 0000000000..2c4fcbd355 --- /dev/null +++ b/core/common/timer/HostMonitorTimerEvent.cpp @@ -0,0 +1,32 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "HostMonitorTimerEvent.h" + +#include "HostMonitorInputRunner.h" + +namespace logtail { + +bool HostMonitorTimerEvent::IsValid() const { + return HostMonitorInputRunner::GetInstance()->IsCollectTaskValid(mConfigName, mCollectorName); +} + +bool HostMonitorTimerEvent::Execute() { + HostMonitorInputRunner::GetInstance()->ScheduleOnce(this); + return true; +} + +} // namespace logtail diff --git a/core/common/timer/HostMonitorTimerEvent.h b/core/common/timer/HostMonitorTimerEvent.h new file mode 100644 index 0000000000..e9f3c08457 --- /dev/null +++ b/core/common/timer/HostMonitorTimerEvent.h @@ -0,0 +1,60 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "QueueKey.h" +#include "timer/TimerEvent.h" + +namespace logtail { + +class HostMonitorTimerEvent : public TimerEvent { +public: + HostMonitorTimerEvent(std::chrono::steady_clock::time_point execTime, + size_t interval, + std::string configName, + std::string collectorName, + QueueKey processQueueKey) + : TimerEvent(execTime), + mConfigName(std::move(configName)), + mCollectorName(collectorName), + mProcessQueueKey(processQueueKey), + mInputIdx(0) { + mInterval = std::chrono::seconds(interval); + } + + bool IsValid() const override; + bool Execute() override; + + const std::string GetConfigName() const { return mConfigName; } + const std::string GetCollectorName() const { return mCollectorName; } + const QueueKey GetProcessQueueKey() const { return mProcessQueueKey; } + int GetInputIndex() const { return mInputIdx; } + const std::chrono::seconds GetInterval() const { return mInterval; } + void ResetForNextExec() { SetExecTime(GetExecTime() + mInterval); } + +private: + std::string mConfigName; + std::string mCollectorName; + QueueKey mProcessQueueKey; + int mInputIdx; + std::chrono::seconds mInterval; +}; + +} // namespace logtail diff --git a/core/common/timer/Timer.h b/core/common/timer/Timer.h index 6825e8443d..4b241e91d5 100644 --- a/core/common/timer/Timer.h +++ b/core/common/timer/Timer.h @@ -53,6 +53,7 @@ class Timer { #ifdef APSARA_UNIT_TEST_MAIN friend class TimerUnittest; friend class ScrapeSchedulerUnittest; + friend class HostMonitorInputRunnerUnittest; #endif }; diff --git a/core/common/timer/TimerEvent.h b/core/common/timer/TimerEvent.h index 005d60e882..e6064b667a 100644 --- a/core/common/timer/TimerEvent.h +++ b/core/common/timer/TimerEvent.h @@ -29,6 +29,7 @@ class TimerEvent { virtual bool Execute() = 0; std::chrono::steady_clock::time_point GetExecTime() const { return mExecTime; } + void SetExecTime(std::chrono::steady_clock::time_point nextExecTime) { mExecTime = nextExecTime; } private: std::chrono::steady_clock::time_point mExecTime; diff --git a/core/constants/EntityConstants.cpp b/core/constants/EntityConstants.cpp new file mode 100644 index 0000000000..17df9c2b5c --- /dev/null +++ b/core/constants/EntityConstants.cpp @@ -0,0 +1,48 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "EntityConstants.h" + +namespace logtail { + +const std::string DEFAULT_ENV_KEY_HOST_TYPE = "HOST_TYPE"; +const std::string DEFAULT_ENV_VALUE_ECS = "ecs"; +const std::string DEFAULT_ENV_VALUE_HOST = "host"; +const std::string DEFAULT_CONTENT_KEY_ENTITY_TYPE = "__entity_type__"; +const std::string DEFAULT_CONTENT_KEY_ENTITY_ID = "__entity_id__"; +const std::string DEFAULT_CONTENT_KEY_DOMAIN = "__domain__"; +const std::string DEFAULT_CONTENT_VALUE_DOMAIN_ACS = "acs"; +const std::string DEFAULT_CONTENT_VALUE_DOMAIN_INFRA = "infra"; +const std::string DEFAULT_CONTENT_KEY_FIRST_OBSERVED_TIME = "__first_observed_time__"; +const std::string DEFAULT_CONTENT_KEY_LAST_OBSERVED_TIME = "__last_observed_time__"; +const std::string DEFAULT_CONTENT_KEY_KEEP_ALIVE_SECONDS = "__keep_alive_seconds__"; +const std::string DEFAULT_CONTENT_KEY_METHOD = "__method__"; +const std::string DEFAULT_CONTENT_VALUE_METHOD_UPDATE = "update"; +const std::string DEFAULT_CONTENT_VALUE_METHOD_EXPIRE = "expire"; + +// for process entity +const std::string DEFAULT_CONTENT_VALUE_ENTITY_TYPE_PROCESS = "process"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_PID = "process_pid"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_PPID = "process_ppid"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_USER = "process_user"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_COMM = "process_comm"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_CREATE_TIME = "process_create_time"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_CWD = "process_cwd"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_BINARY = "process_binary"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_ARGUMENTS = "process_arguments"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_LANGUAGE = "process_language"; +const std::string DEFAULT_CONTENT_KEY_PROCESS_CONTAINER_ID = "process_container_id"; +} // namespace logtail diff --git a/core/constants/EntityConstants.h b/core/constants/EntityConstants.h new file mode 100644 index 0000000000..0da3a13920 --- /dev/null +++ b/core/constants/EntityConstants.h @@ -0,0 +1,48 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace logtail { + +extern const std::string DEFAULT_ENV_KEY_HOST_TYPE; +extern const std::string DEFAULT_ENV_VALUE_ECS; +extern const std::string DEFAULT_ENV_VALUE_HOST; +extern const std::string DEFAULT_CONTENT_KEY_ENTITY_TYPE; +extern const std::string DEFAULT_CONTENT_KEY_ENTITY_ID; +extern const std::string DEFAULT_CONTENT_KEY_DOMAIN; +extern const std::string DEFAULT_CONTENT_VALUE_DOMAIN_ACS; +extern const std::string DEFAULT_CONTENT_VALUE_DOMAIN_INFRA; +extern const std::string DEFAULT_CONTENT_KEY_FIRST_OBSERVED_TIME; +extern const std::string DEFAULT_CONTENT_KEY_LAST_OBSERVED_TIME; +extern const std::string DEFAULT_CONTENT_KEY_KEEP_ALIVE_SECONDS; +extern const std::string DEFAULT_CONTENT_KEY_METHOD; +extern const std::string DEFAULT_CONTENT_VALUE_METHOD_UPDATE; +extern const std::string DEFAULT_CONTENT_VALUE_METHOD_EXPIRE; + +// for process entity +extern const std::string DEFAULT_CONTENT_VALUE_ENTITY_TYPE_PROCESS; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_PID; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_PPID; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_USER; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_COMM; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_CREATE_TIME; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_CWD; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_BINARY; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_ARGUMENTS; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_LANGUAGE; +extern const std::string DEFAULT_CONTENT_KEY_PROCESS_CONTAINER_ID; +} // namespace logtail diff --git a/core/host_monitor/Constants.cpp b/core/host_monitor/Constants.cpp new file mode 100644 index 0000000000..366b314480 --- /dev/null +++ b/core/host_monitor/Constants.cpp @@ -0,0 +1,30 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Constants.h" + + +namespace logtail { + +#ifndef APSARA_UNIT_TEST_MAIN +const bfs::path PROCESS_DIR = "/proc"; +#else +bfs::path PROCESS_DIR = "/proc"; +#endif + +const bfs::path PROCESS_STAT = "stat"; + +} // namespace logtail diff --git a/core/host_monitor/Constants.h b/core/host_monitor/Constants.h new file mode 100644 index 0000000000..788065b81e --- /dev/null +++ b/core/host_monitor/Constants.h @@ -0,0 +1,33 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace bfs = boost::filesystem; + +namespace logtail { + +#ifndef APSARA_UNIT_TEST_MAIN +extern const bfs::path PROCESS_DIR; +#else +extern bfs::path PROCESS_DIR; +#endif + +const extern bfs::path PROCESS_STAT; + +} // namespace logtail diff --git a/core/host_monitor/HostMonitorInputRunner.cpp b/core/host_monitor/HostMonitorInputRunner.cpp new file mode 100644 index 0000000000..ab2618623e --- /dev/null +++ b/core/host_monitor/HostMonitorInputRunner.cpp @@ -0,0 +1,145 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "HostMonitorInputRunner.h" + +#include +#include + +#include "CollectorManager.h" +#include "Lock.h" +#include "LogEvent.h" +#include "Logger.h" +#include "ProcessQueueItem.h" +#include "ProcessQueueManager.h" +#include "ThreadPool.h" +#include "timer/HostMonitorTimerEvent.h" +#include "timer/TimerEvent.h" + + +namespace logtail { + +HostMonitorInputRunner::HostMonitorInputRunner() { + mTimer = std::make_shared(); + mThreadPool = std::make_shared(3); +} + +void HostMonitorInputRunner::UpdateCollector(const std::string& configName, + const std::vector& collectorNames, + QueueKey processQueueKey) { + WriteLock lock(mCollectorMapRWLock); + mCollectorMap[configName] = collectorNames; + for (const auto& collectorName : collectorNames) { + LOG_INFO(sLogger, ("add new host monitor collector", configName)("collector", collectorName)); + mTimer->PushEvent(BuildTimerEvent(configName, collectorName, processQueueKey)); + } +} + +void HostMonitorInputRunner::RemoveCollector(const std::string& configName) { + WriteLock lock(mCollectorMapRWLock); + mCollectorMap.erase(configName); +} + +void HostMonitorInputRunner::Init() { + std::lock_guard lock(mStartMutex); + if (mIsStarted) { + return; + } + LOG_INFO(sLogger, ("HostMonitorInputRunner", "Start")); + mIsStarted = true; + +#ifndef APSARA_UNIT_TEST_MAIN + mThreadPool->Start(); + mTimer->Init(); +#endif +} + +void HostMonitorInputRunner::Stop() { + std::lock_guard lock(mStartMutex); + if (!mIsStarted) { + return; + } + + mIsStarted = false; +#ifndef APSARA_UNIT_TEST_MAIN + mTimer->Stop(); + mThreadPool->Stop(); +#endif + LOG_INFO(sLogger, ("HostMonitorInputRunner", "Stop")); +} + +bool HostMonitorInputRunner::HasRegisteredPlugins() const { + ReadLock lock(mCollectorMapRWLock); + return !mCollectorMap.empty(); +} + +bool HostMonitorInputRunner::IsCollectTaskValid(const std::string& configName, const std::string& collectorName) const { + ReadLock lock(mCollectorMapRWLock); + auto collectors = mCollectorMap.find(configName); + if (collectors == mCollectorMap.end()) { + return false; + } + for (const auto& collectorName : collectors->second) { + if (collectorName == collectorName) { + return true; + } + } + return false; +} + +void HostMonitorInputRunner::ScheduleOnce(HostMonitorTimerEvent* event) { + // TODO: reuse event + HostMonitorTimerEvent eventCopy = *event; + mThreadPool->Add([this, eventCopy]() mutable { + auto configName = eventCopy.GetConfigName(); + auto collectorName = eventCopy.GetCollectorName(); + auto processQueueKey = eventCopy.GetProcessQueueKey(); + PipelineEventGroup group(std::make_shared()); + auto collector = CollectorManager::GetInstance()->GetCollector(collectorName); + collector->Collect(group); + + std::unique_ptr item + = std::make_unique(std::move(group), eventCopy.GetInputIndex()); + if (ProcessQueueManager::GetInstance()->IsValidToPush(processQueueKey)) { + ProcessQueueManager::GetInstance()->PushQueue(processQueueKey, std::move(item)); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + // try again + if (ProcessQueueManager::GetInstance()->IsValidToPush(processQueueKey)) { + ProcessQueueManager::GetInstance()->PushQueue(processQueueKey, std::move(item)); + } else { + LOG_WARNING(sLogger, ("process queue is full", "discard data")("config", configName)); + } + } + + LOG_DEBUG(sLogger, ("schedule host monitor collector again", configName)("collector", collectorName)); + + eventCopy.ResetForNextExec(); + mTimer->PushEvent(std::make_unique(eventCopy)); + }); +} + +std::unique_ptr HostMonitorInputRunner::BuildTimerEvent(const std::string& configName, + const std::string& collectorName, + QueueKey processQueueKey) { + auto now = std::chrono::steady_clock::now(); + auto event = std::make_unique( + now, DEFAULT_SCHEDULE_INTERVAL, configName, collectorName, processQueueKey); + return event; +} + + +} // namespace logtail diff --git a/core/host_monitor/HostMonitorInputRunner.h b/core/host_monitor/HostMonitorInputRunner.h new file mode 100644 index 0000000000..5aceb92c30 --- /dev/null +++ b/core/host_monitor/HostMonitorInputRunner.h @@ -0,0 +1,77 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "InputRunner.h" +#include "Lock.h" +#include "QueueKey.h" +#include "ThreadPool.h" +#include "timer/HostMonitorTimerEvent.h" +#include "timer/Timer.h" + +namespace logtail { + +const int DEFAULT_SCHEDULE_INTERVAL = 10; + +class HostMonitorInputRunner : public InputRunner { +public: + HostMonitorInputRunner(const HostMonitorInputRunner&) = delete; + HostMonitorInputRunner(HostMonitorInputRunner&&) = delete; + HostMonitorInputRunner& operator=(const HostMonitorInputRunner&) = delete; + HostMonitorInputRunner& operator=(HostMonitorInputRunner&&) = delete; + ~HostMonitorInputRunner() override = default; + static HostMonitorInputRunner* GetInstance() { + static HostMonitorInputRunner sInstance; + return &sInstance; + } + + void UpdateCollector(const std::string& configName, + const std::vector& collectorNames, + QueueKey processQueueKey); + void RemoveCollector(const std::string& configName); + + void Init() override; + void Stop() override; + bool HasRegisteredPlugins() const override; + + bool IsCollectTaskValid(const std::string& configName, const std::string& collectorName) const; + void ScheduleOnce(HostMonitorTimerEvent* event); + +private: + HostMonitorInputRunner(); + std::unique_ptr + BuildTimerEvent(const std::string& configName, const std::string& collectorName, QueueKey processQueueKey); + + bool mIsStarted = false; + std::mutex mStartMutex; + + std::shared_ptr mTimer; + std::shared_ptr mThreadPool; + + mutable ReadWriteLock mCollectorMapRWLock; + std::unordered_map> mCollectorMap; + +#ifdef APSARA_UNIT_TEST_MAIN + friend class HostMonitorInputRunnerUnittest; +#endif +}; + +} // namespace logtail diff --git a/core/host_monitor/SystemInformationTools.cpp b/core/host_monitor/SystemInformationTools.cpp new file mode 100644 index 0000000000..edbad31cab --- /dev/null +++ b/core/host_monitor/SystemInformationTools.cpp @@ -0,0 +1,50 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "SystemInformationTools.h" + +#include "Logger.h" +#include "host_monitor/Constants.h" + +namespace logtail { + +int64_t GetSystemBootSeconds() { + static int64_t systemBootSeconds; + if (systemBootSeconds != 0) { + return systemBootSeconds; + } + + std::vector cpuLines = {}; + std::string errorMessage; + int ret = GetFileLines(PROCESS_DIR / PROCESS_STAT, cpuLines, true, &errorMessage); + LOG_WARNING(sLogger, ("failed to get cpu lines", errorMessage)("ret", ret)("cpuLines", cpuLines.size())); + if (ret != 0 || cpuLines.empty()) { + return duration_cast(system_clock::now().time_since_epoch()).count(); + } + + for (auto const& cpuLine : cpuLines) { + auto cpuMetric = SplitString(cpuLine); + if (cpuMetric.size() >= 2 && cpuMetric[0] == "btime") { + constexpr size_t bootTimeIndex = 1; + return bootTimeIndex < cpuMetric.size() ? StringTo(cpuMetric[bootTimeIndex]) : 0; + } + } + + systemBootSeconds = duration_cast(system_clock::now().time_since_epoch()).count(); + return systemBootSeconds; +} + +} // namespace logtail diff --git a/core/host_monitor/SystemInformationTools.h b/core/host_monitor/SystemInformationTools.h new file mode 100644 index 0000000000..c0e1e5189b --- /dev/null +++ b/core/host_monitor/SystemInformationTools.h @@ -0,0 +1,33 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "FileSystemUtil.h" +#include "StringTools.h" +#include "constants/EntityConstants.h" + +using namespace std::chrono; + +namespace logtail { + +int64_t GetSystemBootSeconds(); + +} // namespace logtail diff --git a/core/host_monitor/collector/BaseCollector.h b/core/host_monitor/collector/BaseCollector.h new file mode 100644 index 0000000000..73115e1a53 --- /dev/null +++ b/core/host_monitor/collector/BaseCollector.h @@ -0,0 +1,38 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "PipelineEventGroup.h" + +namespace logtail { + +class BaseCollector { +public: + BaseCollector() = default; + virtual ~BaseCollector() = default; + + bool IsValid() const { return mValidState; } + virtual void Collect(PipelineEventGroup& group) = 0; + + const std::string GetName() const { return mName; } + +protected: + std::string mName; + bool mValidState = false; +}; + +} // namespace logtail diff --git a/core/host_monitor/collector/CollectorManager.cpp b/core/host_monitor/collector/CollectorManager.cpp new file mode 100644 index 0000000000..9a9c865b85 --- /dev/null +++ b/core/host_monitor/collector/CollectorManager.cpp @@ -0,0 +1,44 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "CollectorManager.h" + +#include "BaseCollector.h" +#include "MockCollector.h" +#include "host_monitor/collector/ProcessCollector.h" + +namespace logtail { + +CollectorManager::CollectorManager() { + RegisterCollector(); + RegisterCollector(); +} + +std::shared_ptr CollectorManager::GetCollector(const std::string& collectorName) { + auto it = mCollectorMap.find(collectorName); + if (it == mCollectorMap.end()) { + return nullptr; + } + return it->second; +} + +template +void CollectorManager::RegisterCollector() { + auto collector = std::make_shared(); + mCollectorMap[collector->GetName()] = collector; +} + +} // namespace logtail diff --git a/core/host_monitor/collector/CollectorManager.h b/core/host_monitor/collector/CollectorManager.h new file mode 100644 index 0000000000..2882bc5ef2 --- /dev/null +++ b/core/host_monitor/collector/CollectorManager.h @@ -0,0 +1,44 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "host_monitor/collector/BaseCollector.h" +namespace logtail { + +class CollectorManager { +public: + static CollectorManager* GetInstance() { + static CollectorManager sInstance; + return &sInstance; + } + + std::shared_ptr GetCollector(const std::string& collectorName); + +private: + CollectorManager(); + ~CollectorManager() = default; + + template + void RegisterCollector(); + + std::unordered_map> mCollectorMap; +}; + +} // namespace logtail diff --git a/core/host_monitor/collector/MockCollector.cpp b/core/host_monitor/collector/MockCollector.cpp new file mode 100644 index 0000000000..c9046b5d43 --- /dev/null +++ b/core/host_monitor/collector/MockCollector.cpp @@ -0,0 +1,33 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "MockCollector.h" + +#include + +#include "PipelineEventGroup.h" +#include "constants/EntityConstants.h" + +namespace logtail { + +void MockCollector::Collect(PipelineEventGroup& group) { + auto event = group.AddLogEvent(); + event->SetTimestamp(time(nullptr)); + std::string content = "mock content"; + event->SetContent("mock", content); +} + +} // namespace logtail diff --git a/core/host_monitor/collector/MockCollector.h b/core/host_monitor/collector/MockCollector.h new file mode 100644 index 0000000000..48f8292ae8 --- /dev/null +++ b/core/host_monitor/collector/MockCollector.h @@ -0,0 +1,36 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "constants/EntityConstants.h" +#include "host_monitor/collector/BaseCollector.h" + +namespace logtail { + +class MockCollector : public BaseCollector { +public: + MockCollector() { mName = "mock"; }; + ~MockCollector() override = default; + + void Collect(PipelineEventGroup& group) override; + +private: +}; + +} // namespace logtail diff --git a/core/host_monitor/collector/ProcessCollector.cpp b/core/host_monitor/collector/ProcessCollector.cpp new file mode 100644 index 0000000000..5479f7909b --- /dev/null +++ b/core/host_monitor/collector/ProcessCollector.cpp @@ -0,0 +1,223 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ProcessCollector.h" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "FileSystemUtil.h" +#include "Logger.h" +#include "PipelineEventGroup.h" +#include "StringTools.h" +#include "constants/EntityConstants.h" +#include "host_monitor/SystemInformationTools.h" + +namespace logtail { + +const size_t ProcessTopN = 20; + +void ProcessCollector::Collect(PipelineEventGroup& group) { + std::lock_guard lock(mCollectLock); + + group.SetMetadata(EventGroupMetaKey::HOST_MONITOR_COLLECT_TIME, std::to_string(time(nullptr))); + std::vector processes; + SortProcessByCpu(processes, ProcessTopN); + for (auto process : processes) { + auto event = group.AddLogEvent(); + time_t logtime = time(nullptr); + event->SetTimestamp(logtime); + + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_PID, std::to_string(process->pid)); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_PPID, std::to_string(process->parentPid)); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_CREATE_TIME, + std::to_string(duration_cast(process->startTime.time_since_epoch()).count())); + } +} + +void ProcessCollector::SortProcessByCpu(std::vector& processStats, size_t topN) { + steady_clock::time_point now = steady_clock::now(); + auto compare = [](const std::pair& a, const std::pair& b) { + return a.second < b.second; + }; + std::priority_queue, + std::vector>, + decltype(compare)> + queue(compare); + + int readCount = 0; + WalkAllProcess(PROCESS_DIR, [&](const std::string& dirName) { + if (++readCount > mProcessSilentCount) { + readCount = 0; + std::this_thread::sleep_for(milliseconds{100}); + } + auto pid = StringTo(dirName); + if (pid != 0) { + bool isFirstCollect = false; + auto ptr = GetProcessStat(pid, isFirstCollect); + if (ptr && !isFirstCollect) { + queue.push(std::make_pair(ptr, ptr->cpuInfo.total)); + } + if (queue.size() > topN) { + queue.pop(); + } + } + }); + + processStats.clear(); + processStats.reserve(queue.size()); + while (!queue.empty()) { + processStats.push_back(queue.top().first); + queue.pop(); + } + std::reverse(processStats.begin(), processStats.end()); + + if (processStats.empty()) { + LOG_INFO(sLogger, ("first collect Process Cpu info", "empty")); + } + LOG_DEBUG(sLogger, ("collect Process Cpu info, top", processStats.size())); + + mProcessSortTime = now; + mSortProcessStats = processStats; +} + +ProcessStatPtr ProcessCollector::GetProcessStat(pid_t pid, bool& isFirstCollect) { + const auto now = steady_clock::now(); + + // TODO: more accurate cache + auto prev = GetPreProcessStat(pid); + isFirstCollect = prev == nullptr || prev->lastTime.time_since_epoch().count() == 0; + // proc/[pid]/stat的统计粒度通常为10ms,两次采样之间需要足够大才能平滑。 + if (prev && now < prev->lastTime + seconds{1}) { + return prev; + } + auto ptr = ReadProcessStat(pid); + if (!ptr) { + return nullptr; + } + + // calculate CPU related fields + { + ptr->lastTime = now; + ptr->cpuInfo.user = ptr->utime.count(); + ptr->cpuInfo.sys = ptr->stime.count(); + ptr->cpuInfo.total = ptr->cpuInfo.user + ptr->cpuInfo.sys; + if (isFirstCollect || ptr->cpuInfo.total <= prev->cpuInfo.total) { + // first time called + ptr->cpuInfo.percent = 0.0; + } else { + auto totalDiff = static_cast(ptr->cpuInfo.total - prev->cpuInfo.total); + auto timeDiff = static_cast(ptr->lastTime.time_since_epoch().count() + - prev->lastTime.time_since_epoch().count()); + ptr->cpuInfo.percent = totalDiff / timeDiff * 100; + } + } + + prev = ptr; + mPrevProcessStat[pid] = ptr; + return ptr; +} + +ProcessStatPtr ProcessCollector::ReadProcessStat(pid_t pid) { + LOG_DEBUG(sLogger, ("read process stat", pid)); + auto processStat = PROCESS_DIR / std::to_string(pid) / PROCESS_STAT; + + std::string line; + if (!ReadFileContent(processStat.string(), line)) { + LOG_ERROR(sLogger, ("read process stat", "fail")("file", processStat)); + return nullptr; + } + return ParseProcessStat(pid, line); +} + + +// 数据样例: /proc/1/stat +// 1 (cat) R 0 1 1 34816 1 4194560 1110 0 0 0 1 1 0 0 20 0 1 0 18938584 4505600 171 18446744073709551615 4194304 4238788 +// 140727020025920 0 0 0 0 0 0 0 0 0 17 3 0 0 0 0 0 6336016 6337300 21442560 140727020027760 140727020027777 +// 140727020027777 140727020027887 0 +ProcessStatPtr ProcessCollector::ParseProcessStat(pid_t pid, std::string& line) { + ProcessStatPtr ptr = std::make_shared(); + ptr->pid = pid; + auto nameStartPos = line.find_first_of('('); + auto nameEndPos = line.find_last_of(')'); + if (nameStartPos == std::string::npos || nameEndPos == std::string::npos) { + LOG_ERROR(sLogger, ("can't find process name", pid)("stat", line)); + return nullptr; + } + nameStartPos++; // 跳过左括号 + ptr->name = line.substr(nameStartPos, nameEndPos - nameStartPos); + line = line.substr(nameEndPos + 2); // 跳过右括号及空格 + + std::vector words = SplitString(line); + + constexpr const EnumProcessStat offset = EnumProcessStat::state; // 跳过pid, comm + constexpr const int minCount = EnumProcessStat::processor - offset + 1; // 37 + if (words.size() < minCount) { + LOG_ERROR(sLogger, ("unexpected item count", pid)("stat", line)); + return nullptr; + } + + ptr->state = words[EnumProcessStat::state - offset].front(); + ptr->parentPid = StringTo(words[EnumProcessStat::ppid - offset]); + ptr->tty = StringTo(words[EnumProcessStat::tty_nr - offset]); + ptr->minorFaults = StringTo(words[EnumProcessStat::minflt - offset]); + ptr->majorFaults = StringTo(words[EnumProcessStat::majflt - offset]); + + ptr->utime = static_cast(StringTo(words[EnumProcessStat::utime - offset])); + ptr->stime = static_cast(StringTo(words[EnumProcessStat::stime - offset])); + ptr->cutime = static_cast(StringTo(words[EnumProcessStat::cutime - offset])); + ptr->cstime = static_cast(StringTo(words[EnumProcessStat::cstime - offset])); + + ptr->priority = StringTo(words[EnumProcessStat::priority - offset]); + ptr->nice = StringTo(words[EnumProcessStat::nice - offset]); + ptr->numThreads = StringTo(words[EnumProcessStat::num_threads - offset]); + + ptr->startTime = system_clock::time_point{ + static_cast(StringTo(words[EnumProcessStat::starttime - offset])) + + milliseconds{GetSystemBootSeconds() * 1000}}; + ptr->vSize = StringTo(words[EnumProcessStat::vsize - offset]); + ptr->rss = StringTo(words[EnumProcessStat::rss - offset]) << (getpagesize()); + ptr->processor = StringTo(words[EnumProcessStat::processor - offset]); + return ptr; +} + +bool ProcessCollector::WalkAllProcess(const bfs::path& root, const std::function& callback) { + if (!bfs::exists(root) || !bfs::is_directory(root)) { + LOG_ERROR(sLogger, ("ProcessCollector", "root path is not a directory or not exist")("root", root)); + return false; + } + + for (const auto& dirEntry : bfs::directory_iterator{root, bfs::directory_options::skip_permission_denied}) { + std::string filename = dirEntry.path().filename().string(); + if (IsInt(filename)) { + callback(filename); + } + } + return true; +} + +} // namespace logtail diff --git a/core/host_monitor/collector/ProcessCollector.h b/core/host_monitor/collector/ProcessCollector.h new file mode 100644 index 0000000000..836f106812 --- /dev/null +++ b/core/host_monitor/collector/ProcessCollector.h @@ -0,0 +1,186 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Flags.h" +#include "Logger.h" +#include "MachineInfoUtil.h" +#include "StringTools.h" +#include "constants/EntityConstants.h" +#include "host_monitor/Constants.h" +#include "host_monitor/collector/BaseCollector.h" + +DECLARE_FLAG_INT32(process_collect_silent_count); + +namespace bfs = boost::filesystem; +using namespace std::chrono; + +namespace logtail { + + +struct ProcessCpuInfo { + uint64_t user = 0; + uint64_t sys = 0; + uint64_t total = 0; + double percent = 0.0; +}; + +struct ProcessStat { + pid_t pid = 0; + uint64_t vSize = 0; + uint64_t rss = 0; + uint64_t minorFaults = 0; + uint64_t majorFaults = 0; + pid_t parentPid = 0; + int tty = 0; + int priority = 0; + int nice = 0; + int numThreads = 0; + system_clock::time_point startTime; + steady_clock::time_point lastTime; + + milliseconds utime{0}; + milliseconds stime{0}; + milliseconds cutime{0}; + milliseconds cstime{0}; + ProcessCpuInfo cpuInfo; + + std::string name; + char state = '\0'; + int processor = 0; + + int64_t startMillis() const { return duration_cast(startTime.time_since_epoch()).count(); } +}; + +typedef std::shared_ptr ProcessStatPtr; + +// See https://man7.org/linux/man-pages/man5/proc.5.html +enum class EnumProcessStat : int { + pid, // 0 + comm, // 1 + state, // 2 + ppid, // 3 + pgrp, // 4 + session, // 5 + tty_nr, // 6 + tpgid, // 7 + flags, // 8 + minflt, // 9 + cminflt, // 10 + majflt, // 11 + cmajflt, // 12 + utime, // 13 + stime, // 14 + cutime, // 15 + cstime, // 16 + priority, // 17 + nice, // 18 + num_threads, // 19 + itrealvalue, // 20 + starttime, // 21 + vsize, // 22 + rss, // 23 + rsslim, // 24 + startcode, // 25 + endcode, // 26 + startstack, // 27 + kstkesp, // 28 + kstkeip, // 29 + signal, // 30 + blocked, // 31 + sigignore, // 32 + sigcatch, // 33 + wchan, // 34 + nswap, // 35 + cnswap, // 36 + exit_signal, // 37 + processor, // 38 <--- 至少需要有该字段 + rt_priority, // 39 + policy, // 40 + delayacct_blkio_ticks, // 41 + guest_time, // 42 + cguest_time, // 43 + start_data, // 44 + end_data, // 45 + start_brk, // 46 + arg_start, // 47 + arg_end, // 48 + env_start, // 49 + env_end, // 50 + exit_code, // 51 + + _count, // 只是用于计数,非实际字段 +}; +static_assert((int)EnumProcessStat::comm == 1, "EnumProcessStat invalid"); +static_assert((int)EnumProcessStat::processor == 38, "EnumProcessStat invalid"); + +constexpr int operator-(EnumProcessStat a, EnumProcessStat b) { + return (int)a - (int)b; +} + +class ProcessCollector : public BaseCollector { +public: + ProcessCollector() : mProcessSilentCount(INT32_FLAG(process_collect_silent_count)) { + mName = "process"; + + // try to read process dir + if (access(PROCESS_DIR.c_str(), R_OK) != 0) { + LOG_ERROR(sLogger, ("process collector init failed", "process dir not exist or ")("dir", PROCESS_DIR)); + mValidState = false; + } else { + mValidState = true; + } + }; + ~ProcessCollector() override = default; + + void Collect(PipelineEventGroup& group) override; + +private: + void SortProcessByCpu(std::vector& processStats, size_t topN); + ProcessStatPtr GetProcessStat(pid_t pid, bool& isFirstCollect); + ProcessStatPtr ReadProcessStat(pid_t pid); + ProcessStatPtr ParseProcessStat(pid_t pid, std::string& line); + + bool WalkAllProcess(const bfs::path& root, const std::function& callback); + + ProcessStatPtr GetPreProcessStat(pid_t pid) { return mPrevProcessStat[pid]; } + + std::mutex mCollectLock; + steady_clock::time_point mProcessSortTime; + std::vector mSortProcessStats; + std::unordered_map mPrevProcessStat; + + const int mProcessSilentCount; + +#ifdef APSARA_UNIT_TEST_MAIN + friend class ProcessCollectorUnittest; +#endif +}; + +} // namespace logtail diff --git a/core/models/PipelineEventGroup.h b/core/models/PipelineEventGroup.h index 5831a581d1..be4508827e 100644 --- a/core/models/PipelineEventGroup.h +++ b/core/models/PipelineEventGroup.h @@ -20,8 +20,8 @@ #include #include "checkpoint/RangeCheckpoint.h" -#include "constants/Constants.h" #include "common/memory/SourceBuffer.h" +#include "constants/Constants.h" #include "models/PipelineEventPtr.h" namespace logtail { @@ -58,6 +58,8 @@ enum class EventGroupMetaKey { PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC, PROMETHEUS_UP_STATE, + HOST_MONITOR_COLLECT_TIME, + SOURCE_ID }; diff --git a/core/pipeline/Pipeline.h b/core/pipeline/Pipeline.h index d6a55911c7..643efdc506 100644 --- a/core/pipeline/Pipeline.h +++ b/core/pipeline/Pipeline.h @@ -126,6 +126,7 @@ class Pipeline { friend class InputProcessSecurityUnittest; friend class InputNetworkSecurityUnittest; friend class InputNetworkObserverUnittest; + friend class InputHostMetaUnittest; #endif }; diff --git a/core/pipeline/PipelineManager.cpp b/core/pipeline/PipelineManager.cpp index 57abcde874..183262da9c 100644 --- a/core/pipeline/PipelineManager.cpp +++ b/core/pipeline/PipelineManager.cpp @@ -16,6 +16,7 @@ #include "pipeline/PipelineManager.h" +#include "HostMonitorInputRunner.h" #include "file_server/ConfigManager.h" #include "file_server/FileServer.h" #include "go_pipeline/LogtailPlugin.h" @@ -40,7 +41,8 @@ PipelineManager::PipelineManager() : mInputRunners({ PrometheusInputRunner::GetInstance(), #if defined(__linux__) && !defined(__ANDROID__) - ebpf::eBPFServer::GetInstance(), + ebpf::eBPFServer::GetInstance(), + HostMonitorInputRunner::GetInstance(), #endif }) { } diff --git a/core/pipeline/plugin/PluginRegistry.cpp b/core/pipeline/plugin/PluginRegistry.cpp index bf0d7acbe6..ee6d314fc5 100644 --- a/core/pipeline/plugin/PluginRegistry.cpp +++ b/core/pipeline/plugin/PluginRegistry.cpp @@ -30,7 +30,9 @@ #include "plugin/flusher/sls/FlusherSLS.h" #include "plugin/input/InputContainerStdio.h" #include "plugin/input/InputFile.h" +#include "plugin/input/InputHostMeta.h" #include "plugin/input/InputPrometheus.h" +#include "plugin/processor/inner/ProcessorHostMetaNative.h" #if defined(__linux__) && !defined(__ANDROID__) #include "plugin/input/InputFileSecurity.h" #include "plugin/input/InputInternalMetrics.h" @@ -134,6 +136,7 @@ void PluginRegistry::LoadStaticPlugins() { RegisterInputCreator(new StaticInputCreator()); RegisterInputCreator(new StaticInputCreator()); RegisterInputCreator(new StaticInputCreator()); + RegisterInputCreator(new StaticInputCreator()); #endif RegisterProcessorCreator(new StaticProcessorCreator()); @@ -151,6 +154,7 @@ void PluginRegistry::LoadStaticPlugins() { RegisterProcessorCreator(new StaticProcessorCreator()); RegisterProcessorCreator(new StaticProcessorCreator()); RegisterProcessorCreator(new StaticProcessorCreator()); + RegisterProcessorCreator(new StaticProcessorCreator()); #if defined(__linux__) && !defined(__ANDROID__) && !defined(__EXCLUDE_SPL__) if (BOOL_FLAG(enable_processor_spl)) { RegisterProcessorCreator(new StaticProcessorCreator()); diff --git a/core/pipeline/queue/ProcessQueueManager.h b/core/pipeline/queue/ProcessQueueManager.h index dbe47efeaa..abe0f39c2b 100644 --- a/core/pipeline/queue/ProcessQueueManager.h +++ b/core/pipeline/queue/ProcessQueueManager.h @@ -93,6 +93,7 @@ class ProcessQueueManager : public FeedbackInterface { void Clear(); friend class ProcessQueueManagerUnittest; friend class PipelineUnittest; + friend class HostMonitorInputRunnerUnittest; #endif }; diff --git a/core/plugin/input/InputHostMeta.cpp b/core/plugin/input/InputHostMeta.cpp new file mode 100644 index 0000000000..b8dd6c93fd --- /dev/null +++ b/core/plugin/input/InputHostMeta.cpp @@ -0,0 +1,65 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "InputHostMeta.h" + +#include +#include + +#include "HostMonitorInputRunner.h" +#include "Logger.h" +#include "PluginRegistry.h" +#include "ProcessorInstance.h" +#include "constants/EntityConstants.h" +#include "json/value.h" +#include "pipeline/Pipeline.h" +#include "plugin/processor/inner/ProcessorHostMetaNative.h" + +namespace logtail { + +const std::string InputHostMeta::sName = "input_host_meta"; + +bool InputHostMeta::Init(const Json::Value& config, Json::Value& optionalGoPipeline) { + return CreateInnerProcessors(config); +} + +bool InputHostMeta::Start() { + LOG_INFO(sLogger, ("input host meta start", mContext->GetConfigName())); + HostMonitorInputRunner::GetInstance()->Init(); + HostMonitorInputRunner::GetInstance()->UpdateCollector( + mContext->GetConfigName(), {"process"}, mContext->GetProcessQueueKey()); + return true; +} + +bool InputHostMeta::Stop(bool isPipelineRemoving) { + LOG_INFO(sLogger, ("input host meta stop", mContext->GetConfigName())); + HostMonitorInputRunner::GetInstance()->RemoveCollector(mContext->GetConfigName()); + return true; +} + +bool InputHostMeta::CreateInnerProcessors(const Json::Value& config) { + std::unique_ptr processor; + { + processor = PluginRegistry::GetInstance()->CreateProcessor(ProcessorHostMetaNative::sName, + mContext->GetPipeline().GenNextPluginMeta(false)); + Json::Value detail; + if (!processor->Init(detail, *mContext)) { + return false; + } + mInnerProcessors.emplace_back(std::move(processor)); + } + return true; +} + +} // namespace logtail diff --git a/core/plugin/input/InputHostMeta.h b/core/plugin/input/InputHostMeta.h new file mode 100644 index 0000000000..94587705ba --- /dev/null +++ b/core/plugin/input/InputHostMeta.h @@ -0,0 +1,46 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "json/value.h" +#include "pipeline/plugin/interface/Input.h" + +namespace logtail { + +class InputHostMeta : public Input { +public: + static const std::string sName; + + const std::string& Name() const override { return sName; } + bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) override; + bool Start() override; + bool Stop(bool isPipelineRemoving) override; + bool SupportAck() const override { return false; } + +private: + bool CreateInnerProcessors(const Json::Value& config); + + std::string mDomain; + std::string mEntityType; + std::string mHostEntityID; + +#ifdef APSARA_UNIT_TEST_MAIN + friend class InputHostMetaUnittest; +#endif +}; + +} // namespace logtail diff --git a/core/plugin/processor/inner/ProcessorHostMetaNative.cpp b/core/plugin/processor/inner/ProcessorHostMetaNative.cpp new file mode 100644 index 0000000000..1b8b845e40 --- /dev/null +++ b/core/plugin/processor/inner/ProcessorHostMetaNative.cpp @@ -0,0 +1,117 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ProcessorHostMetaNative.h" + +#include +#include +#include +#include + +#include "Common.h" +#include "LogEvent.h" +#include "Logger.h" +#include "MachineInfoUtil.h" +#include "PipelineEventGroup.h" +#include "StringTools.h" +#include "constants/EntityConstants.h" + +namespace logtail { + +const std::string ProcessorHostMetaNative::sName = "processor_host_meta_native"; + +bool ProcessorHostMetaNative::Init(const Json::Value& config) { + auto hostType = ToString(getenv(DEFAULT_ENV_KEY_HOST_TYPE.c_str())); + std::ostringstream oss; + if (hostType == DEFAULT_ENV_VALUE_ECS) { + oss << DEFAULT_CONTENT_VALUE_DOMAIN_ACS << "." << DEFAULT_ENV_VALUE_ECS << "." + << DEFAULT_CONTENT_VALUE_ENTITY_TYPE_PROCESS; + ECSMeta ecsMeta = FetchECSMeta(); + mDomain = DEFAULT_CONTENT_VALUE_DOMAIN_ACS; + mHostEntityID = ecsMeta.instanceID; + } else { + oss << DEFAULT_CONTENT_VALUE_DOMAIN_INFRA << "." << DEFAULT_ENV_VALUE_HOST << "." + << DEFAULT_CONTENT_VALUE_ENTITY_TYPE_PROCESS; + mDomain = DEFAULT_CONTENT_VALUE_DOMAIN_INFRA; + mHostEntityID = GetHostIp(); + } + mEntityType = oss.str(); + return true; +} + +void ProcessorHostMetaNative::Process(PipelineEventGroup& group) { + EventsContainer& events = group.MutableEvents(); + EventsContainer newEvents; + + for (auto& event : events) { + ProcessEvent(group, std::move(event), newEvents); + } + events.swap(newEvents); +} + +bool ProcessorHostMetaNative::IsSupportedEvent(const PipelineEventPtr& event) const { + return event.Is(); +} + +void ProcessorHostMetaNative::ProcessEvent(PipelineEventGroup& group, + PipelineEventPtr&& e, + EventsContainer& newEvents) { + if (!IsSupportedEvent(e)) { + newEvents.emplace_back(std::move(e)); + return; + } + + auto& sourceEvent = e.Cast(); + std::unique_ptr targetEvent = group.CreateLogEvent(true); + targetEvent->SetTimestamp(sourceEvent.GetTimestamp()); + + // TODO: support host entity + targetEvent->SetContent(DEFAULT_CONTENT_KEY_ENTITY_TYPE, mEntityType); + targetEvent->SetContent(DEFAULT_CONTENT_KEY_ENTITY_ID, + GetProcessEntityID(sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_PID), + sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_CREATE_TIME))); + targetEvent->SetContent(DEFAULT_CONTENT_KEY_DOMAIN, mDomain); + targetEvent->SetContent(DEFAULT_CONTENT_KEY_FIRST_OBSERVED_TIME, + sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_CREATE_TIME)); + targetEvent->SetContent(DEFAULT_CONTENT_KEY_LAST_OBSERVED_TIME, + group.GetMetadata(EventGroupMetaKey::HOST_MONITOR_COLLECT_TIME)); + targetEvent->SetContent(DEFAULT_CONTENT_KEY_KEEP_ALIVE_SECONDS, "30"); + // TODO: support delete event + targetEvent->SetContent(DEFAULT_CONTENT_KEY_METHOD, DEFAULT_CONTENT_VALUE_METHOD_UPDATE); + + targetEvent->SetContent("pid", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_PID)); + targetEvent->SetContent("ppid", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_PPID)); + targetEvent->SetContent("user", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_USER)); + targetEvent->SetContent("comm", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_COMM)); + targetEvent->SetContent("create_time", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_CREATE_TIME)); + targetEvent->SetContent("cwd", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_CWD)); + targetEvent->SetContent("binary", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_BINARY)); + targetEvent->SetContent("arguments", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_ARGUMENTS)); + targetEvent->SetContent("language", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_LANGUAGE)); + targetEvent->SetContent("containerID", sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_CONTAINER_ID)); + newEvents.emplace_back(std::move(targetEvent), true, nullptr); +} + + +const std::string ProcessorHostMetaNative::GetProcessEntityID(StringView pid, StringView createTime) { + std::ostringstream oss; + oss << mHostEntityID << pid << createTime; + auto bigID = sdk::CalcMD5(oss.str()); + std::transform(bigID.begin(), bigID.end(), bigID.begin(), ::tolower); + return bigID; +} + +} // namespace logtail diff --git a/core/plugin/processor/inner/ProcessorHostMetaNative.h b/core/plugin/processor/inner/ProcessorHostMetaNative.h new file mode 100644 index 0000000000..3b32e20b06 --- /dev/null +++ b/core/plugin/processor/inner/ProcessorHostMetaNative.h @@ -0,0 +1,48 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "PipelineEventPtr.h" +#include "Processor.h" + +namespace logtail { +class ProcessorHostMetaNative : public Processor { +public: + static const std::string sName; + + const std::string& Name() const override { return sName; } + bool Init(const Json::Value& config) override; + void Process(PipelineEventGroup& group) override; + +protected: + bool IsSupportedEvent(const PipelineEventPtr& event) const override; + +private: + void ProcessEvent(PipelineEventGroup& group, PipelineEventPtr&& e, EventsContainer& newEvents); + + const std::string GetProcessEntityID(StringView pid, StringView createTime); + + std::string mDomain; + std::string mEntityType; + std::string mHostEntityID; + +#ifdef APSARA_UNIT_TEST_MAIN + friend class ProcessorHostMetaNativeUnittest; +#endif +}; + +} // namespace logtail diff --git a/core/runner/sink/http/HttpSink.cpp b/core/runner/sink/http/HttpSink.cpp index b7ef0beac7..2bd2e77cb9 100644 --- a/core/runner/sink/http/HttpSink.cpp +++ b/core/runner/sink/http/HttpSink.cpp @@ -245,17 +245,17 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { long statusCode = 0; curl_easy_getinfo(handler, CURLINFO_RESPONSE_CODE, &statusCode); request->mResponse.SetStatusCode(statusCode); - static_cast(request->mItem->mFlusher)->OnSendDone(request->mResponse, request->mItem); - FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); - mOutSuccessfulItemsTotal->Add(1); - mSuccessfulItemTotalResponseTimeMs->Add(responseTime); - mSendingItemsTotal->Sub(1); LOG_DEBUG( sLogger, ("send http request succeeded, item address", request->mItem)( "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( "response time", ToString(responseTimeMs) + "ms")("try cnt", ToString(request->mTryCnt))( "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); + static_cast(request->mItem->mFlusher)->OnSendDone(request->mResponse, request->mItem); + FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); + mOutSuccessfulItemsTotal->Add(1); + mSuccessfulItemTotalResponseTimeMs->Add(responseTime); + mSendingItemsTotal->Sub(1); break; } default: @@ -277,9 +277,6 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { ++runningHandlers; requestReused = true; } else { - static_cast(request->mItem->mFlusher) - ->OnSendDone(request->mResponse, request->mItem); - FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); LOG_DEBUG(sLogger, ("failed to send http request", "abort")("item address", request->mItem)( "config-flusher-dst", @@ -287,6 +284,9 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { "response time", ToString(responseTimeMs) + "ms")("try cnt", ToString(request->mTryCnt))( "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); + static_cast(request->mItem->mFlusher) + ->OnSendDone(request->mResponse, request->mItem); + FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); } mOutFailedItemsTotal->Add(1); mFailedItemTotalResponseTimeMs->Add(responseTime); diff --git a/core/unittest/CMakeLists.txt b/core/unittest/CMakeLists.txt index 41f1601069..f77bb6998e 100644 --- a/core/unittest/CMakeLists.txt +++ b/core/unittest/CMakeLists.txt @@ -58,6 +58,7 @@ macro(add_core_subdir) add_subdirectory(prometheus) add_subdirectory(route) add_subdirectory(task_pipeline) + add_subdirectory(host_monitor) endmacro() macro(add_spl_subdir) diff --git a/core/unittest/host_monitor/1/stat b/core/unittest/host_monitor/1/stat new file mode 100644 index 0000000000..cbda9f715f --- /dev/null +++ b/core/unittest/host_monitor/1/stat @@ -0,0 +1 @@ +1 (cat) R 0 1 1 34816 1 4194560 1110 0 0 0 1 1 0 0 20 0 1 0 18938584 4505600 171 18446744073709551615 4194304 4238788 140727020025920 0 0 0 0 0 0 0 0 0 17 3 0 0 0 0 0 6336016 6337300 21442560 140727020027760 140727020027777 140727020027777 140727020027887 0 \ No newline at end of file diff --git a/core/unittest/host_monitor/CMakeLists.txt b/core/unittest/host_monitor/CMakeLists.txt new file mode 100644 index 0000000000..7a412f8e81 --- /dev/null +++ b/core/unittest/host_monitor/CMakeLists.txt @@ -0,0 +1,38 @@ +# Copyright 2023 iLogtail Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.22) +project(host_monitor_unittest) + +add_executable(collector_manager_unittest CollectorManagerUnittest.cpp) +target_link_libraries(collector_manager_unittest ${UT_BASE_TARGET}) + +add_executable(process_collector_unittest ProcessCollectorUnittest.cpp) +target_link_libraries(process_collector_unittest ${UT_BASE_TARGET}) + +add_executable(host_monitor_input_runner_unittest HostMonitorInputRunnerUnittest.cpp) +target_link_libraries(host_monitor_input_runner_unittest ${UT_BASE_TARGET}) + +add_executable(system_information_tools_unittest SystemInformationToolsUnittest.cpp) +target_link_libraries(system_information_tools_unittest ${UT_BASE_TARGET}) + +file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/1) +file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/1/ DESTINATION ${CMAKE_CURRENT_BINARY_DIR}/1/) +file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/stat DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) + +include(GoogleTest) +gtest_discover_tests(collector_manager_unittest) +gtest_discover_tests(process_collector_unittest) +gtest_discover_tests(host_monitor_input_runner_unittest) +gtest_discover_tests(system_information_tools_unittest) diff --git a/core/unittest/host_monitor/CollectorManagerUnittest.cpp b/core/unittest/host_monitor/CollectorManagerUnittest.cpp new file mode 100644 index 0000000000..595be90ded --- /dev/null +++ b/core/unittest/host_monitor/CollectorManagerUnittest.cpp @@ -0,0 +1,42 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "CollectorManager.h" +#include "unittest/Unittest.h" + +using namespace std; + +namespace logtail { + +class CollectorManagerUnittest : public testing::Test { +public: + void TestGetCollector() const; +}; + +void CollectorManagerUnittest::TestGetCollector() const { + { + auto collector1 = CollectorManager::GetInstance()->GetCollector("mock"); + APSARA_TEST_NOT_EQUAL_FATAL(nullptr, collector1); + APSARA_TEST_EQUAL_FATAL("mock", collector1->GetName()); + auto collector2 = CollectorManager::GetInstance()->GetCollector("mock"); + APSARA_TEST_EQUAL_FATAL(collector1, collector2); + } +} + + +UNIT_TEST_CASE(CollectorManagerUnittest, TestGetCollector); + +} // namespace logtail + +UNIT_TEST_MAIN diff --git a/core/unittest/host_monitor/HostMonitorInputRunnerUnittest.cpp b/core/unittest/host_monitor/HostMonitorInputRunnerUnittest.cpp new file mode 100644 index 0000000000..76946515fd --- /dev/null +++ b/core/unittest/host_monitor/HostMonitorInputRunnerUnittest.cpp @@ -0,0 +1,78 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "HostMonitorInputRunner.h" +#include "Logger.h" +#include "ProcessQueueItem.h" +#include "ProcessQueueManager.h" +#include "QueueKey.h" +#include "QueueKeyManager.h" +#include "unittest/Unittest.h" + +using namespace std; + +namespace logtail { + +class HostMonitorInputRunnerUnittest : public testing::Test { +public: + void TestUpdateAndRemoveCollector() const; + void TestScheduleOnce() const; +}; + +void HostMonitorInputRunnerUnittest::TestUpdateAndRemoveCollector() const { + auto runner = HostMonitorInputRunner::GetInstance(); + runner->Init(); + runner->UpdateCollector("test", {"mock"}, QueueKey{}); + APSARA_TEST_TRUE_FATAL(runner->IsCollectTaskValid("test", "mock")); + APSARA_TEST_TRUE_FATAL(runner->HasRegisteredPlugins()); + runner->RemoveCollector("test"); + APSARA_TEST_FALSE_FATAL(runner->IsCollectTaskValid("test", "mock")); + APSARA_TEST_FALSE_FATAL(runner->HasRegisteredPlugins()); + runner->Stop(); +} + +void HostMonitorInputRunnerUnittest::TestScheduleOnce() const { + auto runner = HostMonitorInputRunner::GetInstance(); + runner->mTimer = std::make_shared(); + runner->Init(); + runner->mThreadPool->Start(); + std::string configName = "test"; + auto queueKey = QueueKeyManager::GetInstance()->GetKey(configName); + auto ctx = PipelineContext(); + ctx.SetConfigName(configName); + ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(queueKey, 0, ctx); + + HostMonitorTimerEvent event(std::chrono::steady_clock::now(), 15, configName, "mock", queueKey); + runner->ScheduleOnce(&event); + std::this_thread::sleep_for(std::chrono::seconds(1)); + auto item = std::unique_ptr(new ProcessQueueItem(std::make_shared(), 0)); + ProcessQueueManager::GetInstance()->EnablePop(configName); + APSARA_TEST_TRUE_FATAL(ProcessQueueManager::GetInstance()->PopItem(0, item, configName)); + APSARA_TEST_EQUAL_FATAL("test", configName); + APSARA_TEST_TRUE_FATAL(item->mEventGroup.GetEvents().size() == 1); + + // verify schdule next + APSARA_TEST_EQUAL_FATAL(runner->mTimer->mQueue.size(), 1); + runner->mThreadPool->Stop(); + runner->Stop(); +} + +UNIT_TEST_CASE(HostMonitorInputRunnerUnittest, TestUpdateAndRemoveCollector); +UNIT_TEST_CASE(HostMonitorInputRunnerUnittest, TestScheduleOnce); + +} // namespace logtail + +UNIT_TEST_MAIN diff --git a/core/unittest/host_monitor/ProcessCollectorUnittest.cpp b/core/unittest/host_monitor/ProcessCollectorUnittest.cpp new file mode 100644 index 0000000000..8e4c8c327c --- /dev/null +++ b/core/unittest/host_monitor/ProcessCollectorUnittest.cpp @@ -0,0 +1,58 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ProcessCollector.h" +#include "host_monitor/Constants.h" +#include "unittest/Unittest.h" + +using namespace std; + +namespace logtail { + +class ProcessCollectorUnittest : public testing::Test { +public: + void TestReadProcessStat() const; + void TestSortProcessByCpu() const; +}; + +void ProcessCollectorUnittest::TestReadProcessStat() const { + PROCESS_DIR = "."; + auto collector = ProcessCollector(); + auto ptr = collector.ReadProcessStat(1); + APSARA_TEST_NOT_EQUAL(nullptr, ptr); + APSARA_TEST_EQUAL(1, ptr->pid); + APSARA_TEST_EQUAL("cat", ptr->name); +} + +void ProcessCollectorUnittest::TestSortProcessByCpu() const { + PROCESS_DIR = "/proc"; + auto collector = ProcessCollector(); + auto processes = vector(); + collector.SortProcessByCpu(processes, 5); // fist time will be ignored + collector.SortProcessByCpu(processes, 5); + APSARA_TEST_EQUAL(5, processes.size()); + auto prev = processes[0]; + for (auto i = 1; i < processes.size(); i++) { + auto process = processes[i]; + APSARA_TEST_TRUE(process->cpuInfo.percent <= prev->cpuInfo.percent); + prev = process; + } +} + +UNIT_TEST_CASE(ProcessCollectorUnittest, TestReadProcessStat); +UNIT_TEST_CASE(ProcessCollectorUnittest, TestSortProcessByCpu); + +} // namespace logtail + +UNIT_TEST_MAIN diff --git a/core/unittest/host_monitor/SystemInformationToolsUnittest.cpp b/core/unittest/host_monitor/SystemInformationToolsUnittest.cpp new file mode 100644 index 0000000000..5da4e004d8 --- /dev/null +++ b/core/unittest/host_monitor/SystemInformationToolsUnittest.cpp @@ -0,0 +1,38 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "SystemInformationTools.h" +#include "host_monitor/Constants.h" +#include "unittest/Unittest.h" + +using namespace std; + +namespace logtail { + +class SystemInformationToolsUnittest : public testing::Test { +public: + void TestGetSystemBootSeconds() const; +}; + +void SystemInformationToolsUnittest::TestGetSystemBootSeconds() const { + PROCESS_DIR = "."; + APSARA_TEST_EQUAL(1731142542, GetSystemBootSeconds()); +} + + +UNIT_TEST_CASE(SystemInformationToolsUnittest, TestGetSystemBootSeconds); + +} // namespace logtail + +UNIT_TEST_MAIN diff --git a/core/unittest/host_monitor/stat b/core/unittest/host_monitor/stat new file mode 100644 index 0000000000..2499f9d324 --- /dev/null +++ b/core/unittest/host_monitor/stat @@ -0,0 +1 @@ +btime 1731142542 \ No newline at end of file diff --git a/core/unittest/input/CMakeLists.txt b/core/unittest/input/CMakeLists.txt index 49129244cf..3e18f939f5 100644 --- a/core/unittest/input/CMakeLists.txt +++ b/core/unittest/input/CMakeLists.txt @@ -36,6 +36,9 @@ target_link_libraries(input_ebpf_network_security_unittest unittest_base) add_executable(input_ebpf_network_observer_unittest InputNetworkObserverUnittest.cpp) target_link_libraries(input_ebpf_network_observer_unittest unittest_base) +add_executable(input_host_meat_unittest InputHostMetaUnittest.cpp) +target_link_libraries(input_host_meat_unittest unittest_base) + include(GoogleTest) gtest_discover_tests(input_file_unittest) gtest_discover_tests(input_container_stdio_unittest) @@ -44,3 +47,4 @@ gtest_discover_tests(input_ebpf_file_security_unittest) gtest_discover_tests(input_ebpf_process_security_unittest) gtest_discover_tests(input_ebpf_network_security_unittest) gtest_discover_tests(input_ebpf_network_observer_unittest) +gtest_discover_tests(input_host_meat_unittest) diff --git a/core/unittest/input/InputHostMetaUnittest.cpp b/core/unittest/input/InputHostMetaUnittest.cpp new file mode 100644 index 0000000000..9c284f945a --- /dev/null +++ b/core/unittest/input/InputHostMetaUnittest.cpp @@ -0,0 +1,131 @@ +// Copyright 2023 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "PluginRegistry.h" +#include "common/JsonUtil.h" +#include "ebpf/config.h" +#include "pipeline/Pipeline.h" +#include "plugin/input/InputHostMeta.h" +#include "unittest/Unittest.h" + +using namespace std; + +namespace logtail { + +class InputHostMetaUnittest : public testing::Test { +public: + void TestName(); + void TestSupportAck(); + void OnSuccessfulInit(); + void OnFailedInit(); + void OnSuccessfulStart(); + void OnSuccessfulStop(); + // void OnPipelineUpdate(); + +protected: + void SetUp() override { + p.mName = "test_config"; + ctx.SetConfigName("test_config"); + ctx.SetPipeline(p); + PluginRegistry::GetInstance()->LoadPlugins(); + } + +private: + Pipeline p; + PipelineContext ctx; +}; + +void InputHostMetaUnittest::TestName() { + InputHostMeta input; + std::string name = input.Name(); + APSARA_TEST_EQUAL(name, "input_host_meta"); +} + +void InputHostMetaUnittest::TestSupportAck() { + InputHostMeta input; + bool supportAck = input.SupportAck(); + APSARA_TEST_FALSE(supportAck); +} + +void InputHostMetaUnittest::OnSuccessfulInit() { + unique_ptr input; + Json::Value configJson, optionalGoPipeline; + string configStr, errorMsg; + + // valid optional param + configStr = R"( + { + "Type": "input_host_meta" + } + )"; + APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); + input.reset(new InputHostMeta()); + input->SetContext(ctx); + input->SetMetricsRecordRef("test", "1"); + APSARA_TEST_TRUE(input->Init(configJson, optionalGoPipeline)); + APSARA_TEST_EQUAL(input->sName, "input_host_meta"); +} + +void InputHostMetaUnittest::OnFailedInit() { +} + +void InputHostMetaUnittest::OnSuccessfulStart() { + unique_ptr input; + Json::Value configJson, optionalGoPipeline; + string configStr, errorMsg; + + configStr = R"( + { + "Type": "input_host_meta" + } + )"; + APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); + input.reset(new InputHostMeta()); + input->SetContext(ctx); + input->SetMetricsRecordRef("test", "1"); + APSARA_TEST_TRUE(input->Init(configJson, optionalGoPipeline)); + APSARA_TEST_TRUE(input->Start()); +} + +void InputHostMetaUnittest::OnSuccessfulStop() { + unique_ptr input; + Json::Value configJson, optionalGoPipeline; + string configStr, errorMsg; + + configStr = R"( + { + "Type": "input_host_meta" + } + )"; + APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); + input.reset(new InputHostMeta()); + input->SetContext(ctx); + input->SetMetricsRecordRef("test", "1"); + APSARA_TEST_TRUE(input->Init(configJson, optionalGoPipeline)); + APSARA_TEST_TRUE(input->Start()); + APSARA_TEST_TRUE(input->Stop(false)); +} + +UNIT_TEST_CASE(InputHostMetaUnittest, TestName) +UNIT_TEST_CASE(InputHostMetaUnittest, TestSupportAck) +UNIT_TEST_CASE(InputHostMetaUnittest, OnSuccessfulInit) +UNIT_TEST_CASE(InputHostMetaUnittest, OnFailedInit) +UNIT_TEST_CASE(InputHostMetaUnittest, OnSuccessfulStart) +UNIT_TEST_CASE(InputHostMetaUnittest, OnSuccessfulStop) + +} // namespace logtail + +UNIT_TEST_MAIN diff --git a/core/unittest/processor/CMakeLists.txt b/core/unittest/processor/CMakeLists.txt index 98cb60a55d..ad62741998 100644 --- a/core/unittest/processor/CMakeLists.txt +++ b/core/unittest/processor/CMakeLists.txt @@ -63,6 +63,9 @@ target_link_libraries(boost_regex_benchmark ${UT_BASE_TARGET}) add_executable(processor_prom_parse_metric_native_unittest ProcessorPromParseMetricNativeUnittest.cpp) target_link_libraries(processor_prom_parse_metric_native_unittest unittest_base) +add_executable(processor_host_meta_native_unittest ProcessorHostMetaNativeUnittest.cpp) +target_link_libraries(processor_host_meta_native_unittest unittest_base) + include(GoogleTest) gtest_discover_tests(processor_split_log_string_native_unittest) gtest_discover_tests(processor_split_multiline_log_string_native_unittest) @@ -78,3 +81,4 @@ gtest_discover_tests(processor_desensitize_native_unittest) gtest_discover_tests(processor_merge_multiline_log_native_unittest) gtest_discover_tests(processor_parse_container_log_native_unittest) gtest_discover_tests(processor_prom_parse_metric_native_unittest) +gtest_discover_tests(processor_host_meta_native_unittest) diff --git a/core/unittest/processor/ProcessorHostMetaNativeUnittest.cpp b/core/unittest/processor/ProcessorHostMetaNativeUnittest.cpp new file mode 100644 index 0000000000..cc60ecda1b --- /dev/null +++ b/core/unittest/processor/ProcessorHostMetaNativeUnittest.cpp @@ -0,0 +1,120 @@ +// Copyright 2023 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "LogEvent.h" +#include "constants/EntityConstants.h" +#include "pipeline/Pipeline.h" +#include "plugin/processor/inner/ProcessorHostMetaNative.h" +#include "unittest/Unittest.h" + +namespace logtail { + +class ProcessorHostMetaNativeUnittest : public ::testing::Test { +public: + void TestInit(); + void TestProcess(); + void TestGetProcessEntityID(); + +protected: + void SetUp() override { mContext.SetConfigName("project##config_0"); } + +private: + PipelineContext mContext; +}; + +void ProcessorHostMetaNativeUnittest::TestInit() { + // make config + Json::Value config; + Pipeline pipeline; + mContext.SetPipeline(pipeline); + + { + setenv(DEFAULT_ENV_KEY_HOST_TYPE.c_str(), DEFAULT_ENV_VALUE_ECS.c_str(), 1); + ProcessorHostMetaNative processor; + processor.SetContext(mContext); + APSARA_TEST_TRUE_FATAL(processor.Init(config)); + APSARA_TEST_EQUAL_FATAL(processor.mDomain, DEFAULT_CONTENT_VALUE_DOMAIN_ACS); + APSARA_TEST_EQUAL_FATAL(processor.mEntityType, + DEFAULT_CONTENT_VALUE_DOMAIN_ACS + "." + DEFAULT_ENV_VALUE_ECS + "." + + DEFAULT_CONTENT_VALUE_ENTITY_TYPE_PROCESS); + } + { + setenv(DEFAULT_ENV_KEY_HOST_TYPE.c_str(), DEFAULT_CONTENT_VALUE_DOMAIN_INFRA.c_str(), 1); + ProcessorHostMetaNative processor; + processor.SetContext(mContext); + APSARA_TEST_TRUE_FATAL(processor.Init(config)); + APSARA_TEST_EQUAL_FATAL(processor.mDomain, DEFAULT_CONTENT_VALUE_DOMAIN_INFRA); + APSARA_TEST_EQUAL_FATAL(processor.mEntityType, + DEFAULT_CONTENT_VALUE_DOMAIN_INFRA + "." + DEFAULT_ENV_VALUE_HOST + "." + + DEFAULT_CONTENT_VALUE_ENTITY_TYPE_PROCESS); + } +} + +void ProcessorHostMetaNativeUnittest::TestProcess() { + // make config + Json::Value config; + auto sourceBuffer = std::make_shared(); + PipelineEventGroup eventGroup(sourceBuffer); + eventGroup.SetMetadataNoCopy(EventGroupMetaKey::HOST_MONITOR_COLLECT_TIME, "123456"); + auto event = eventGroup.AddLogEvent(); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_PID, "123"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_CREATE_TIME, "123456"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_PPID, "123"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_USER, "root"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_COMM, "comm"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_CWD, "cwd"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_BINARY, "binary"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_ARGUMENTS, "arguments"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_LANGUAGE, "language"); + event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_CONTAINER_ID, "container_id"); + + ProcessorHostMetaNative processor; + processor.SetContext(mContext); + APSARA_TEST_TRUE_FATAL(processor.Init(config)); + processor.Process(eventGroup); + APSARA_TEST_EQUAL_FATAL(eventGroup.GetEvents().size(), 1); + auto newEvent = eventGroup.GetEvents().front().Cast(); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent(DEFAULT_CONTENT_KEY_DOMAIN), processor.mDomain); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent(DEFAULT_CONTENT_KEY_ENTITY_TYPE), processor.mEntityType); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent(DEFAULT_CONTENT_KEY_FIRST_OBSERVED_TIME), "123456"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent(DEFAULT_CONTENT_KEY_KEEP_ALIVE_SECONDS), "30"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent(DEFAULT_CONTENT_KEY_METHOD), DEFAULT_CONTENT_VALUE_METHOD_UPDATE); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("pid"), "123"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("ppid"), "123"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("user"), "root"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("comm"), "comm"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("create_time"), "123456"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("cwd"), "cwd"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("binary"), "binary"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("arguments"), "arguments"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("language"), "language"); + APSARA_TEST_EQUAL_FATAL(newEvent.GetContent("containerID"), "container_id"); +} + +void ProcessorHostMetaNativeUnittest::TestGetProcessEntityID() { + ProcessorHostMetaNative processor; + processor.Init(Json::Value()); + processor.mHostEntityID = "123"; + APSARA_TEST_EQUAL(processor.GetProcessEntityID("123", "123"), "f5bb0c8de146c67b44babbf4e6584cc0"); +} + +UNIT_TEST_CASE(ProcessorHostMetaNativeUnittest, TestInit) +UNIT_TEST_CASE(ProcessorHostMetaNativeUnittest, TestProcess) +UNIT_TEST_CASE(ProcessorHostMetaNativeUnittest, TestGetProcessEntityID) + +} // namespace logtail + +UNIT_TEST_MAIN From 8f9f8ee0760557c790376cfced1981553a944ac2 Mon Sep 17 00:00:00 2001 From: abingcbc Date: Fri, 29 Nov 2024 00:01:27 +0800 Subject: [PATCH 2/2] fix --- core/application/Application.cpp | 6 +- core/common/FileSystemUtil.cpp | 4 +- core/common/FileSystemUtil.h | 8 +- core/common/MachineInfoUtil.cpp | 93 +++++++++++ core/common/MachineInfoUtil.h | 8 +- core/common/StringTools.cpp | 2 +- core/common/common.cmake | 2 +- core/common/timer/HostMonitorTimerEvent.h | 60 ------- core/common/timer/Timer.cpp | 6 + core/common/timer/Timer.h | 10 ++ core/host_monitor/Constants.cpp | 6 +- core/host_monitor/Constants.h | 10 +- core/host_monitor/HostMonitorInputRunner.cpp | 147 ++++++++++-------- core/host_monitor/HostMonitorInputRunner.h | 31 ++-- .../HostMonitorTimerEvent.cpp | 7 +- core/host_monitor/HostMonitorTimerEvent.h | 62 ++++++++ core/host_monitor/collector/BaseCollector.h | 0 .../collector/CollectorManager.cpp | 44 ------ .../host_monitor/collector/CollectorManager.h | 44 ------ core/host_monitor/collector/MockCollector.cpp | 33 ---- core/host_monitor/collector/MockCollector.h | 36 ----- .../collector/ProcessCollector.cpp | 12 +- .../host_monitor/collector/ProcessCollector.h | 9 +- core/models/PipelineEventGroup.h | 2 +- core/pipeline/PipelineManager.cpp | 1 + core/plugin/input/InputHostMeta.cpp | 6 +- core/plugin/input/InputHostMeta.h | 7 +- .../inner/ProcessorHostMetaNative.cpp | 6 +- core/unittest/host_monitor/CMakeLists.txt | 4 - .../host_monitor/CollectorManagerUnittest.cpp | 42 ----- .../HostMonitorInputRunnerUnittest.cpp | 18 ++- .../ProcessorHostMetaNativeUnittest.cpp | 2 +- 32 files changed, 326 insertions(+), 402 deletions(-) delete mode 100644 core/common/timer/HostMonitorTimerEvent.h mode change 100644 => 100755 core/host_monitor/HostMonitorInputRunner.cpp rename core/{common/timer => host_monitor}/HostMonitorTimerEvent.cpp (78%) create mode 100644 core/host_monitor/HostMonitorTimerEvent.h mode change 100644 => 100755 core/host_monitor/collector/BaseCollector.h delete mode 100644 core/host_monitor/collector/CollectorManager.cpp delete mode 100644 core/host_monitor/collector/CollectorManager.h delete mode 100644 core/host_monitor/collector/MockCollector.cpp delete mode 100644 core/host_monitor/collector/MockCollector.h mode change 100644 => 100755 core/host_monitor/collector/ProcessCollector.cpp mode change 100644 => 100755 core/host_monitor/collector/ProcessCollector.h delete mode 100644 core/unittest/host_monitor/CollectorManagerUnittest.cpp diff --git a/core/application/Application.cpp b/core/application/Application.cpp index 09901cced9..7cd18c73e3 100644 --- a/core/application/Application.cpp +++ b/core/application/Application.cpp @@ -14,6 +14,8 @@ #include "application/Application.h" +#include "timer/Timer.h" + #ifndef LOGTAIL_NO_TC_MALLOC #include #endif @@ -263,9 +265,9 @@ void Application::Start() { // GCOVR_EXCL_START } ProcessorRunner::GetInstance()->Init(); + Timer::GetInstance()->Init(); - time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0, - lastCheckTagsTime = 0, lastQueueGCTime = 0; + time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0, lastCheckTagsTime = 0, lastQueueGCTime = 0; #ifndef LOGTAIL_NO_TC_MALLOC time_t lastTcmallocReleaseMemTime = 0; #endif diff --git a/core/common/FileSystemUtil.cpp b/core/common/FileSystemUtil.cpp index 80d7e24fd0..28d51fb9f2 100644 --- a/core/common/FileSystemUtil.cpp +++ b/core/common/FileSystemUtil.cpp @@ -156,7 +156,7 @@ int GetLines(std::istream& is, return 0; } -int GetLines(const bfs::path& filename, +int GetLines(const std::filesystem::path& filename, bool enableEmptyLine, const std::function& pushBack, std::string* errorMessage) { @@ -178,7 +178,7 @@ int GetLines(const bfs::path& filename, return ret; } -int GetFileLines(const bfs::path& filename, +int GetFileLines(const std::filesystem::path& filename, std::vector& res, bool enableEmptyLine, std::string* errorMessage) { diff --git a/core/common/FileSystemUtil.h b/core/common/FileSystemUtil.h index 8e0f523600..4254c85b3f 100644 --- a/core/common/FileSystemUtil.h +++ b/core/common/FileSystemUtil.h @@ -26,14 +26,12 @@ #elif defined(_MSC_VER) #include #endif -#include +#include #include "DevInode.h" #include "ErrorUtil.h" #include "LogtailCommonFlags.h" -namespace bfs = boost::filesystem; - // Filesystem utility. namespace logtail { @@ -95,11 +93,11 @@ int GetLines(std::istream& is, bool enableEmptyLine, const std::function& pushBack, std::string* errorMessage); -int GetLines(const bfs::path& filename, +int GetLines(const std::filesystem::path& filename, bool enableEmptyLine, const std::function& pushBack, std::string* errorMessage); -int GetFileLines(const bfs::path& filename, +int GetFileLines(const std::filesystem::path& filename, std::vector& res, bool enableEmptyLine = true, std::string* errorMessage = nullptr); diff --git a/core/common/MachineInfoUtil.cpp b/core/common/MachineInfoUtil.cpp index 47e0650fa7..d545802427 100644 --- a/core/common/MachineInfoUtil.cpp +++ b/core/common/MachineInfoUtil.cpp @@ -15,6 +15,11 @@ #include "MachineInfoUtil.h" #include + +#include + +#include "AppConfig.h" +#include "common/UUIDUtil.h" #if defined(__linux__) #include #include @@ -40,8 +45,10 @@ #include "FileSystemUtil.h" #include "StringTools.h" +#include "common/FileSystemUtil.h" #include "logger/Logger.h" +DEFINE_FLAG_STRING(agent_host_id, "", ""); #if defined(_MSC_VER) typedef LONG NTSTATUS, *PNTSTATUS; @@ -496,6 +503,92 @@ size_t FetchECSMetaCallback(char* buffer, size_t size, size_t nmemb, std::string return sizes; } +std::string GetSerialNumberFromEcsAssist(const std::string& machineIdFile) { + std::string sn; + if (CheckExistance(machineIdFile)) { + if (!ReadFileContent(machineIdFile, sn)) { + return ""; + } + } + return sn; +} + +static std::string GetEcsAssistMachineIdFile() { +#if defined(WIN32) + return "C:\\ProgramData\\aliyun\\assist\\hybrid\\machine-id"; +#else + return "/usr/local/share/aliyun-assist/hybrid/machine-id"; +#endif +} + +std::string GetSerialNumberFromEcsAssist() { + return GetSerialNumberFromEcsAssist(GetEcsAssistMachineIdFile()); +} + +std::string RandomHostid() { + static std::string hostId = CalculateRandomUUID(); + return hostId; +} + +const std::string& GetLocalHostId() { + static std::string fileName = AppConfig::GetInstance()->GetLoongcollectorConfDir() + PATH_SEPARATOR + "host_id"; + static std::string hostId; + if (!hostId.empty()) { + return hostId; + } + if (CheckExistance(fileName)) { + if (!ReadFileContent(fileName, hostId)) { + hostId = ""; + } + } + if (hostId.empty()) { + hostId = RandomHostid(); + + LOG_INFO(sLogger, ("save hostId file to local file system, hostId", hostId)); + int fd = open(fileName.c_str(), O_CREAT | O_EXCL | O_WRONLY, 0755); + if (fd == -1) { + int savedErrno = errno; + if (savedErrno != EEXIST) { + LOG_ERROR(sLogger, ("save hostId file fail", fileName)("errno", strerror(savedErrno))); + } + } else { + // 文件成功创建,现在写入hostId + ssize_t written = write(fd, hostId.c_str(), hostId.length()); + if (written == static_cast(hostId.length())) { + LOG_INFO(sLogger, ("hostId saved successfully to", fileName)); + } else { + int writeErrno = errno; + LOG_ERROR(sLogger, ("Failed to write hostId to file", fileName)("errno", strerror(writeErrno))); + } + close(fd); + } + } + return hostId; +} + +std::string FetchHostId() { + static std::string hostId; + if (!hostId.empty()) { + return hostId; + } + hostId = STRING_FLAG(agent_host_id); + if (!hostId.empty()) { + return hostId; + } + ECSMeta meta = FetchECSMeta(); + hostId = meta.instanceID; + if (!hostId.empty()) { + return hostId; + } + hostId = GetSerialNumberFromEcsAssist(); + if (!hostId.empty()) { + return hostId; + } + hostId = GetLocalHostId(); + + return hostId; +} + ECSMeta FetchECSMeta() { CURL* curl; for (size_t retryTimes = 1; retryTimes <= 5; retryTimes++) { diff --git a/core/common/MachineInfoUtil.h b/core/common/MachineInfoUtil.h index 0983cbf352..d8d6a3e963 100644 --- a/core/common/MachineInfoUtil.h +++ b/core/common/MachineInfoUtil.h @@ -15,10 +15,11 @@ */ #pragma once +#include +#include + #include #include -#include -#include namespace logtail { @@ -37,8 +38,9 @@ std::string GetHostIp(const std::string& intf = ""); void GetAllPids(std::unordered_set& pids); bool GetKernelInfo(std::string& kernelRelease, int64_t& kernelVersion); bool GetRedHatReleaseInfo(std::string& os, int64_t& osVersion, std::string bashPath = ""); -bool IsDigitsDotsHostname(const char *hostname); +bool IsDigitsDotsHostname(const char* hostname); ECSMeta FetchECSMeta(); +std::string FetchHostId(); // GetAnyAvailableIP walks through all interfaces (AF_INET) to find an available IP. // Priority: diff --git a/core/common/StringTools.cpp b/core/common/StringTools.cpp index 57b29cfde9..36ad405bfc 100644 --- a/core/common/StringTools.cpp +++ b/core/common/StringTools.cpp @@ -365,7 +365,7 @@ void RemoveFilePathTrailingSlash(std::string& filePath) { bool IsInt(const char* sz) { bool ok = (sz != nullptr && *sz != '\0'); - for (auto* it = reinterpret_cast(sz); ok && *it; ++it) { + for (auto* it = sz; ok && *it; ++it) { ok = (0 != std::isdigit(*it)); } return ok; diff --git a/core/common/common.cmake b/core/common/common.cmake index 94d715bf8d..cd5e9401c6 100644 --- a/core/common/common.cmake +++ b/core/common/common.cmake @@ -29,7 +29,7 @@ list(APPEND THIS_SOURCE_FILES_LIST ${XX_HASH_SOURCE_FILES}) # add memory in common list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/memory/SourceBuffer.h) list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/http/AsynCurlRunner.cpp ${CMAKE_SOURCE_DIR}/common/http/Curl.cpp ${CMAKE_SOURCE_DIR}/common/http/HttpResponse.cpp ${CMAKE_SOURCE_DIR}/common/http/HttpRequest.cpp) -list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/timer/Timer.cpp ${CMAKE_SOURCE_DIR}/common/timer/HttpRequestTimerEvent.cpp ${CMAKE_SOURCE_DIR}/common/timer/HostMonitorTimerEvent.cpp) +list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/timer/Timer.cpp ${CMAKE_SOURCE_DIR}/common/timer/HttpRequestTimerEvent.cpp) list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/compression/Compressor.cpp ${CMAKE_SOURCE_DIR}/common/compression/CompressorFactory.cpp ${CMAKE_SOURCE_DIR}/common/compression/LZ4Compressor.cpp ${CMAKE_SOURCE_DIR}/common/compression/ZstdCompressor.cpp) # remove several files in common list(REMOVE_ITEM THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/BoostRegexValidator.cpp ${CMAKE_SOURCE_DIR}/common/GetUUID.cpp) diff --git a/core/common/timer/HostMonitorTimerEvent.h b/core/common/timer/HostMonitorTimerEvent.h deleted file mode 100644 index e9f3c08457..0000000000 --- a/core/common/timer/HostMonitorTimerEvent.h +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright 2024 iLogtail Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include - -#include "QueueKey.h" -#include "timer/TimerEvent.h" - -namespace logtail { - -class HostMonitorTimerEvent : public TimerEvent { -public: - HostMonitorTimerEvent(std::chrono::steady_clock::time_point execTime, - size_t interval, - std::string configName, - std::string collectorName, - QueueKey processQueueKey) - : TimerEvent(execTime), - mConfigName(std::move(configName)), - mCollectorName(collectorName), - mProcessQueueKey(processQueueKey), - mInputIdx(0) { - mInterval = std::chrono::seconds(interval); - } - - bool IsValid() const override; - bool Execute() override; - - const std::string GetConfigName() const { return mConfigName; } - const std::string GetCollectorName() const { return mCollectorName; } - const QueueKey GetProcessQueueKey() const { return mProcessQueueKey; } - int GetInputIndex() const { return mInputIdx; } - const std::chrono::seconds GetInterval() const { return mInterval; } - void ResetForNextExec() { SetExecTime(GetExecTime() + mInterval); } - -private: - std::string mConfigName; - std::string mCollectorName; - QueueKey mProcessQueueKey; - int mInputIdx; - std::chrono::seconds mInterval; -}; - -} // namespace logtail diff --git a/core/common/timer/Timer.cpp b/core/common/timer/Timer.cpp index 08ed7ed12c..57c7ab77f1 100644 --- a/core/common/timer/Timer.cpp +++ b/core/common/timer/Timer.cpp @@ -23,6 +23,9 @@ namespace logtail { void Timer::Init() { { lock_guard lock(mThreadRunningMux); + if (mIsThreadRunning) { + return; + } mIsThreadRunning = true; } mThreadRes = async(launch::async, &Timer::Run, this); @@ -31,6 +34,9 @@ void Timer::Init() { void Timer::Stop() { { lock_guard lock(mThreadRunningMux); + if (!mIsThreadRunning) { + return; + } mIsThreadRunning = false; } mCV.notify_one(); diff --git a/core/common/timer/Timer.h b/core/common/timer/Timer.h index 4b241e91d5..ad1ca15be4 100644 --- a/core/common/timer/Timer.h +++ b/core/common/timer/Timer.h @@ -34,11 +34,21 @@ struct TimerEventCompare { class Timer { public: + Timer(const Timer&) = delete; + Timer(Timer&&) = delete; + Timer& operator=(const Timer&) = delete; + Timer& operator=(Timer&&) = delete; + ~Timer() = default; + static Timer* GetInstance() { + static Timer sInstance; + return &sInstance; + } void Init(); void Stop(); void PushEvent(std::unique_ptr&& e); private: + Timer() = default; void Run(); mutable std::mutex mQueueMux; diff --git a/core/host_monitor/Constants.cpp b/core/host_monitor/Constants.cpp index 366b314480..4d538400e7 100644 --- a/core/host_monitor/Constants.cpp +++ b/core/host_monitor/Constants.cpp @@ -20,11 +20,11 @@ namespace logtail { #ifndef APSARA_UNIT_TEST_MAIN -const bfs::path PROCESS_DIR = "/proc"; +const std::filesystem::path PROCESS_DIR = "/proc"; #else -bfs::path PROCESS_DIR = "/proc"; +std::filesystem::path PROCESS_DIR = "/proc"; #endif -const bfs::path PROCESS_STAT = "stat"; +const std::filesystem::path PROCESS_STAT = "stat"; } // namespace logtail diff --git a/core/host_monitor/Constants.h b/core/host_monitor/Constants.h index 788065b81e..cdbf3e7dd2 100644 --- a/core/host_monitor/Constants.h +++ b/core/host_monitor/Constants.h @@ -16,18 +16,16 @@ #pragma once -#include - -namespace bfs = boost::filesystem; +#include namespace logtail { #ifndef APSARA_UNIT_TEST_MAIN -extern const bfs::path PROCESS_DIR; +extern const std::filesystem::path PROCESS_DIR; #else -extern bfs::path PROCESS_DIR; +extern std::filesystem::path PROCESS_DIR; #endif -const extern bfs::path PROCESS_STAT; +const extern std::filesystem::path PROCESS_STAT; } // namespace logtail diff --git a/core/host_monitor/HostMonitorInputRunner.cpp b/core/host_monitor/HostMonitorInputRunner.cpp old mode 100644 new mode 100755 index ab2618623e..926be0151f --- a/core/host_monitor/HostMonitorInputRunner.cpp +++ b/core/host_monitor/HostMonitorInputRunner.cpp @@ -16,80 +16,89 @@ #include "HostMonitorInputRunner.h" +#include #include #include +#include +#include +#include +#include -#include "CollectorManager.h" -#include "Lock.h" -#include "LogEvent.h" -#include "Logger.h" -#include "ProcessQueueItem.h" -#include "ProcessQueueManager.h" -#include "ThreadPool.h" -#include "timer/HostMonitorTimerEvent.h" -#include "timer/TimerEvent.h" +#include "HostMonitorTimerEvent.h" +#include "common/Lock.h" +#include "common/timer/Timer.h" +#include "host_monitor/collector/ProcessCollector.h" +#include "logger/Logger.h" +#include "runner/ProcessorRunner.h" namespace logtail { -HostMonitorInputRunner::HostMonitorInputRunner() { - mTimer = std::make_shared(); - mThreadPool = std::make_shared(3); +HostMonitorInputRunner::HostMonitorInputRunner() : mThreadPool(ThreadPool(3)) { + RegisterCollector(); } void HostMonitorInputRunner::UpdateCollector(const std::string& configName, - const std::vector& collectorNames, - QueueKey processQueueKey) { - WriteLock lock(mCollectorMapRWLock); - mCollectorMap[configName] = collectorNames; - for (const auto& collectorName : collectorNames) { + const std::vector& newCollectors, + QueueKey processQueueKey, + int inputIndex) { + std::vector oldCollectors; + { + std::unique_lock lock(mCollectorRegisterMapMutex); + auto it = mCollectorRegisterMap.find(configName); + if (it != mCollectorRegisterMap.end()) { + oldCollectors = it->second; + } + mCollectorRegisterMap[configName] = newCollectors; + } + for (const auto& collectorName : newCollectors) { LOG_INFO(sLogger, ("add new host monitor collector", configName)("collector", collectorName)); - mTimer->PushEvent(BuildTimerEvent(configName, collectorName, processQueueKey)); + auto collectConfig = std::make_unique( + configName, collectorName, processQueueKey, inputIndex, std::chrono::seconds(DEFAULT_SCHEDULE_INTERVAL)); + // only push event when the collector is new added + if (std::find(oldCollectors.begin(), oldCollectors.end(), collectorName) == oldCollectors.end()) { + Timer::GetInstance()->PushEvent(BuildTimerEvent(std::move(collectConfig))); + } } } void HostMonitorInputRunner::RemoveCollector(const std::string& configName) { - WriteLock lock(mCollectorMapRWLock); - mCollectorMap.erase(configName); + std::unique_lock lock(mCollectorRegisterMapMutex); + mCollectorRegisterMap.erase(configName); } void HostMonitorInputRunner::Init() { - std::lock_guard lock(mStartMutex); - if (mIsStarted) { + if (mIsStarted.exchange(true)) { return; } LOG_INFO(sLogger, ("HostMonitorInputRunner", "Start")); - mIsStarted = true; - #ifndef APSARA_UNIT_TEST_MAIN - mThreadPool->Start(); - mTimer->Init(); + mThreadPool.Start(); #endif } void HostMonitorInputRunner::Stop() { - std::lock_guard lock(mStartMutex); - if (!mIsStarted) { + if (!mIsStarted.exchange(false)) { return; } - - mIsStarted = false; #ifndef APSARA_UNIT_TEST_MAIN - mTimer->Stop(); - mThreadPool->Stop(); + std::future result = std::async(std::launch::async, [this]() { mThreadPool.Stop(); }); + if (result.wait_for(std::chrono::seconds(3)) == std::future_status::timeout) { + LOG_ERROR(sLogger, ("HostMonitorInputRunner stop timeout 3 seconds", "may cause thread leak")); + } #endif LOG_INFO(sLogger, ("HostMonitorInputRunner", "Stop")); } bool HostMonitorInputRunner::HasRegisteredPlugins() const { - ReadLock lock(mCollectorMapRWLock); - return !mCollectorMap.empty(); + std::shared_lock lock(mCollectorRegisterMapMutex); + return !mCollectorRegisterMap.empty(); } bool HostMonitorInputRunner::IsCollectTaskValid(const std::string& configName, const std::string& collectorName) const { - ReadLock lock(mCollectorMapRWLock); - auto collectors = mCollectorMap.find(configName); - if (collectors == mCollectorMap.end()) { + std::shared_lock lock(mCollectorRegisterMapMutex); + auto collectors = mCollectorRegisterMap.find(configName); + if (collectors == mCollectorRegisterMap.end()) { return false; } for (const auto& collectorName : collectors->second) { @@ -100,46 +109,50 @@ bool HostMonitorInputRunner::IsCollectTaskValid(const std::string& configName, c return false; } -void HostMonitorInputRunner::ScheduleOnce(HostMonitorTimerEvent* event) { - // TODO: reuse event - HostMonitorTimerEvent eventCopy = *event; - mThreadPool->Add([this, eventCopy]() mutable { - auto configName = eventCopy.GetConfigName(); - auto collectorName = eventCopy.GetCollectorName(); - auto processQueueKey = eventCopy.GetProcessQueueKey(); +void HostMonitorInputRunner::ScheduleOnce(std::unique_ptr collectConfig) { + mThreadPool.Add([this, &collectConfig]() { PipelineEventGroup group(std::make_shared()); - auto collector = CollectorManager::GetInstance()->GetCollector(collectorName); - collector->Collect(group); - - std::unique_ptr item - = std::make_unique(std::move(group), eventCopy.GetInputIndex()); - if (ProcessQueueManager::GetInstance()->IsValidToPush(processQueueKey)) { - ProcessQueueManager::GetInstance()->PushQueue(processQueueKey, std::move(item)); - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - // try again - if (ProcessQueueManager::GetInstance()->IsValidToPush(processQueueKey)) { - ProcessQueueManager::GetInstance()->PushQueue(processQueueKey, std::move(item)); - } else { - LOG_WARNING(sLogger, ("process queue is full", "discard data")("config", configName)); - } + auto collector = GetCollector(collectConfig->mCollectorName); + if (!collector) { + collector->Collect(group); } - LOG_DEBUG(sLogger, ("schedule host monitor collector again", configName)("collector", collectorName)); + bool result = ProcessorRunner::GetInstance()->PushQueue( + collectConfig->mProcessQueueKey, collectConfig->mInputIndex, std::move(group), 3); + if (!result) { + LOG_WARNING(sLogger, + ("push queue failed", "discard data")("config", collectConfig->mConfigName)( + "collector", collectConfig->mCollectorName)); + } + LOG_DEBUG(sLogger, + ("schedule host monitor collector again", collectConfig->mConfigName)("collector", + collectConfig->mCollectorName)); - eventCopy.ResetForNextExec(); - mTimer->PushEvent(std::make_unique(eventCopy)); + auto event = BuildTimerEvent(std::move(collectConfig)); + event->ResetForNextExec(); + Timer::GetInstance()->PushEvent(std::move(event)); }); } -std::unique_ptr HostMonitorInputRunner::BuildTimerEvent(const std::string& configName, - const std::string& collectorName, - QueueKey processQueueKey) { +std::unique_ptr +HostMonitorInputRunner::BuildTimerEvent(std::unique_ptr collectConfig) { auto now = std::chrono::steady_clock::now(); - auto event = std::make_unique( - now, DEFAULT_SCHEDULE_INTERVAL, configName, collectorName, processQueueKey); + auto event = std::make_unique(now, std::move(collectConfig)); return event; } +std::shared_ptr HostMonitorInputRunner::GetCollector(const std::string& collectorName) { + auto it = mCollectorInstanceMap.find(collectorName); + if (it == mCollectorInstanceMap.end()) { + return nullptr; + } + return it->second; +} + +template +void HostMonitorInputRunner::RegisterCollector() { + auto collector = std::make_shared(); + mCollectorInstanceMap[collector->GetName()] = collector; +} } // namespace logtail diff --git a/core/host_monitor/HostMonitorInputRunner.h b/core/host_monitor/HostMonitorInputRunner.h index 5aceb92c30..f83ffc3af9 100644 --- a/core/host_monitor/HostMonitorInputRunner.h +++ b/core/host_monitor/HostMonitorInputRunner.h @@ -16,16 +16,19 @@ #pragma once +#include #include +#include #include +#include #include +#include "BaseCollector.h" #include "InputRunner.h" #include "Lock.h" #include "QueueKey.h" #include "ThreadPool.h" -#include "timer/HostMonitorTimerEvent.h" -#include "timer/Timer.h" +#include "host_monitor/HostMonitorTimerEvent.h" namespace logtail { @@ -45,7 +48,8 @@ class HostMonitorInputRunner : public InputRunner { void UpdateCollector(const std::string& configName, const std::vector& collectorNames, - QueueKey processQueueKey); + QueueKey processQueueKey, + int inputIndex); void RemoveCollector(const std::string& configName); void Init() override; @@ -53,21 +57,24 @@ class HostMonitorInputRunner : public InputRunner { bool HasRegisteredPlugins() const override; bool IsCollectTaskValid(const std::string& configName, const std::string& collectorName) const; - void ScheduleOnce(HostMonitorTimerEvent* event); + void ScheduleOnce(std::unique_ptr collectConfig); private: HostMonitorInputRunner(); - std::unique_ptr - BuildTimerEvent(const std::string& configName, const std::string& collectorName, QueueKey processQueueKey); + std::unique_ptr + BuildTimerEvent(std::unique_ptr collectConfig); - bool mIsStarted = false; - std::mutex mStartMutex; + template + void RegisterCollector(); + std::shared_ptr GetCollector(const std::string& collectorName); - std::shared_ptr mTimer; - std::shared_ptr mThreadPool; + std::atomic_bool mIsStarted = false; - mutable ReadWriteLock mCollectorMapRWLock; - std::unordered_map> mCollectorMap; + ThreadPool mThreadPool; + + mutable std::shared_mutex mCollectorRegisterMapMutex; + std::unordered_map> mCollectorRegisterMap; + std::unordered_map> mCollectorInstanceMap; #ifdef APSARA_UNIT_TEST_MAIN friend class HostMonitorInputRunnerUnittest; diff --git a/core/common/timer/HostMonitorTimerEvent.cpp b/core/host_monitor/HostMonitorTimerEvent.cpp similarity index 78% rename from core/common/timer/HostMonitorTimerEvent.cpp rename to core/host_monitor/HostMonitorTimerEvent.cpp index 2c4fcbd355..3fb5079309 100644 --- a/core/common/timer/HostMonitorTimerEvent.cpp +++ b/core/host_monitor/HostMonitorTimerEvent.cpp @@ -16,16 +16,19 @@ #include "HostMonitorTimerEvent.h" +#include + #include "HostMonitorInputRunner.h" namespace logtail { bool HostMonitorTimerEvent::IsValid() const { - return HostMonitorInputRunner::GetInstance()->IsCollectTaskValid(mConfigName, mCollectorName); + return HostMonitorInputRunner::GetInstance()->IsCollectTaskValid(mCollectConfig->mConfigName, + mCollectConfig->mCollectorName); } bool HostMonitorTimerEvent::Execute() { - HostMonitorInputRunner::GetInstance()->ScheduleOnce(this); + HostMonitorInputRunner::GetInstance()->ScheduleOnce(std::move(mCollectConfig)); return true; } diff --git a/core/host_monitor/HostMonitorTimerEvent.h b/core/host_monitor/HostMonitorTimerEvent.h new file mode 100644 index 0000000000..07a3dceb40 --- /dev/null +++ b/core/host_monitor/HostMonitorTimerEvent.h @@ -0,0 +1,62 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "QueueKey.h" +#include "timer/TimerEvent.h" + +namespace logtail { + + +class HostMonitorTimerEvent : public TimerEvent { +public: + struct CollectConfig { + CollectConfig(std::string configName, + std::string collectorName, + QueueKey processQueueKey, + int inputIndex, + std::chrono::seconds interval) + : mConfigName(configName), + mCollectorName(collectorName), + mProcessQueueKey(processQueueKey), + mInputIndex(inputIndex), + mInterval(interval) {} + + std::string mConfigName; + std::string mCollectorName; + QueueKey mProcessQueueKey; + size_t mInputIndex; + std::chrono::seconds mInterval; + }; + + HostMonitorTimerEvent(std::chrono::steady_clock::time_point execTime, std::unique_ptr collectConfig) + : TimerEvent(execTime), mCollectConfig(std::move(collectConfig)) {} + + bool IsValid() const override; + bool Execute() override; + void ResetForNextExec() { SetExecTime(GetExecTime() + mCollectConfig->mInterval); } + +private: + std::unique_ptr mCollectConfig; +}; + +} // namespace logtail diff --git a/core/host_monitor/collector/BaseCollector.h b/core/host_monitor/collector/BaseCollector.h old mode 100644 new mode 100755 diff --git a/core/host_monitor/collector/CollectorManager.cpp b/core/host_monitor/collector/CollectorManager.cpp deleted file mode 100644 index 9a9c865b85..0000000000 --- a/core/host_monitor/collector/CollectorManager.cpp +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2024 iLogtail Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "CollectorManager.h" - -#include "BaseCollector.h" -#include "MockCollector.h" -#include "host_monitor/collector/ProcessCollector.h" - -namespace logtail { - -CollectorManager::CollectorManager() { - RegisterCollector(); - RegisterCollector(); -} - -std::shared_ptr CollectorManager::GetCollector(const std::string& collectorName) { - auto it = mCollectorMap.find(collectorName); - if (it == mCollectorMap.end()) { - return nullptr; - } - return it->second; -} - -template -void CollectorManager::RegisterCollector() { - auto collector = std::make_shared(); - mCollectorMap[collector->GetName()] = collector; -} - -} // namespace logtail diff --git a/core/host_monitor/collector/CollectorManager.h b/core/host_monitor/collector/CollectorManager.h deleted file mode 100644 index 2882bc5ef2..0000000000 --- a/core/host_monitor/collector/CollectorManager.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2024 iLogtail Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include -#include - -#include "host_monitor/collector/BaseCollector.h" -namespace logtail { - -class CollectorManager { -public: - static CollectorManager* GetInstance() { - static CollectorManager sInstance; - return &sInstance; - } - - std::shared_ptr GetCollector(const std::string& collectorName); - -private: - CollectorManager(); - ~CollectorManager() = default; - - template - void RegisterCollector(); - - std::unordered_map> mCollectorMap; -}; - -} // namespace logtail diff --git a/core/host_monitor/collector/MockCollector.cpp b/core/host_monitor/collector/MockCollector.cpp deleted file mode 100644 index c9046b5d43..0000000000 --- a/core/host_monitor/collector/MockCollector.cpp +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2024 iLogtail Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "MockCollector.h" - -#include - -#include "PipelineEventGroup.h" -#include "constants/EntityConstants.h" - -namespace logtail { - -void MockCollector::Collect(PipelineEventGroup& group) { - auto event = group.AddLogEvent(); - event->SetTimestamp(time(nullptr)); - std::string content = "mock content"; - event->SetContent("mock", content); -} - -} // namespace logtail diff --git a/core/host_monitor/collector/MockCollector.h b/core/host_monitor/collector/MockCollector.h deleted file mode 100644 index 48f8292ae8..0000000000 --- a/core/host_monitor/collector/MockCollector.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2024 iLogtail Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include - -#include "constants/EntityConstants.h" -#include "host_monitor/collector/BaseCollector.h" - -namespace logtail { - -class MockCollector : public BaseCollector { -public: - MockCollector() { mName = "mock"; }; - ~MockCollector() override = default; - - void Collect(PipelineEventGroup& group) override; - -private: -}; - -} // namespace logtail diff --git a/core/host_monitor/collector/ProcessCollector.cpp b/core/host_monitor/collector/ProcessCollector.cpp old mode 100644 new mode 100755 index 5479f7909b..c999c8444c --- a/core/host_monitor/collector/ProcessCollector.cpp +++ b/core/host_monitor/collector/ProcessCollector.cpp @@ -42,9 +42,7 @@ namespace logtail { const size_t ProcessTopN = 20; void ProcessCollector::Collect(PipelineEventGroup& group) { - std::lock_guard lock(mCollectLock); - - group.SetMetadata(EventGroupMetaKey::HOST_MONITOR_COLLECT_TIME, std::to_string(time(nullptr))); + group.SetMetadata(EventGroupMetaKey::COLLECT_TIME, std::to_string(time(nullptr))); std::vector processes; SortProcessByCpu(processes, ProcessTopN); for (auto process : processes) { @@ -205,13 +203,15 @@ ProcessStatPtr ProcessCollector::ParseProcessStat(pid_t pid, std::string& line) return ptr; } -bool ProcessCollector::WalkAllProcess(const bfs::path& root, const std::function& callback) { - if (!bfs::exists(root) || !bfs::is_directory(root)) { +bool ProcessCollector::WalkAllProcess(const std::filesystem::path& root, + const std::function& callback) { + if (!std::filesystem::exists(root) || !std::filesystem::is_directory(root)) { LOG_ERROR(sLogger, ("ProcessCollector", "root path is not a directory or not exist")("root", root)); return false; } - for (const auto& dirEntry : bfs::directory_iterator{root, bfs::directory_options::skip_permission_denied}) { + for (const auto& dirEntry : + std::filesystem::directory_iterator{root, std::filesystem::directory_options::skip_permission_denied}) { std::string filename = dirEntry.path().filename().string(); if (IsInt(filename)) { callback(filename); diff --git a/core/host_monitor/collector/ProcessCollector.h b/core/host_monitor/collector/ProcessCollector.h old mode 100644 new mode 100755 index 836f106812..a7fa8c5aa4 --- a/core/host_monitor/collector/ProcessCollector.h +++ b/core/host_monitor/collector/ProcessCollector.h @@ -22,23 +22,19 @@ #include #include #include +#include #include -#include -#include #include #include #include "Flags.h" #include "Logger.h" -#include "MachineInfoUtil.h" -#include "StringTools.h" #include "constants/EntityConstants.h" #include "host_monitor/Constants.h" #include "host_monitor/collector/BaseCollector.h" DECLARE_FLAG_INT32(process_collect_silent_count); -namespace bfs = boost::filesystem; using namespace std::chrono; namespace logtail { @@ -167,11 +163,10 @@ class ProcessCollector : public BaseCollector { ProcessStatPtr ReadProcessStat(pid_t pid); ProcessStatPtr ParseProcessStat(pid_t pid, std::string& line); - bool WalkAllProcess(const bfs::path& root, const std::function& callback); + bool WalkAllProcess(const std::filesystem::path& root, const std::function& callback); ProcessStatPtr GetPreProcessStat(pid_t pid) { return mPrevProcessStat[pid]; } - std::mutex mCollectLock; steady_clock::time_point mProcessSortTime; std::vector mSortProcessStats; std::unordered_map mPrevProcessStat; diff --git a/core/models/PipelineEventGroup.h b/core/models/PipelineEventGroup.h index be4508827e..885f343ba9 100644 --- a/core/models/PipelineEventGroup.h +++ b/core/models/PipelineEventGroup.h @@ -58,7 +58,7 @@ enum class EventGroupMetaKey { PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC, PROMETHEUS_UP_STATE, - HOST_MONITOR_COLLECT_TIME, + COLLECT_TIME, SOURCE_ID }; diff --git a/core/pipeline/PipelineManager.cpp b/core/pipeline/PipelineManager.cpp index 183262da9c..be6ce21c1b 100644 --- a/core/pipeline/PipelineManager.cpp +++ b/core/pipeline/PipelineManager.cpp @@ -199,6 +199,7 @@ void PipelineManager::StopAllPipelines() { LogtailPlugin::GetInstance()->StopAllPipelines(true); + Timer::GetInstance()->Stop(); ProcessorRunner::GetInstance()->Stop(); FlushAllBatch(); diff --git a/core/plugin/input/InputHostMeta.cpp b/core/plugin/input/InputHostMeta.cpp index b8dd6c93fd..c62316117e 100644 --- a/core/plugin/input/InputHostMeta.cpp +++ b/core/plugin/input/InputHostMeta.cpp @@ -38,13 +38,15 @@ bool InputHostMeta::Start() { LOG_INFO(sLogger, ("input host meta start", mContext->GetConfigName())); HostMonitorInputRunner::GetInstance()->Init(); HostMonitorInputRunner::GetInstance()->UpdateCollector( - mContext->GetConfigName(), {"process"}, mContext->GetProcessQueueKey()); + mContext->GetConfigName(), {"process"}, mContext->GetProcessQueueKey(), mIndex); return true; } bool InputHostMeta::Stop(bool isPipelineRemoving) { LOG_INFO(sLogger, ("input host meta stop", mContext->GetConfigName())); - HostMonitorInputRunner::GetInstance()->RemoveCollector(mContext->GetConfigName()); + if (isPipelineRemoving) { + HostMonitorInputRunner::GetInstance()->RemoveCollector(mContext->GetConfigName()); + } return true; } diff --git a/core/plugin/input/InputHostMeta.h b/core/plugin/input/InputHostMeta.h index 94587705ba..b31a901599 100644 --- a/core/plugin/input/InputHostMeta.h +++ b/core/plugin/input/InputHostMeta.h @@ -16,7 +16,6 @@ #pragma once -#include "json/value.h" #include "pipeline/plugin/interface/Input.h" namespace logtail { @@ -29,15 +28,11 @@ class InputHostMeta : public Input { bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) override; bool Start() override; bool Stop(bool isPipelineRemoving) override; - bool SupportAck() const override { return false; } + bool SupportAck() const override { return true; } private: bool CreateInnerProcessors(const Json::Value& config); - std::string mDomain; - std::string mEntityType; - std::string mHostEntityID; - #ifdef APSARA_UNIT_TEST_MAIN friend class InputHostMetaUnittest; #endif diff --git a/core/plugin/processor/inner/ProcessorHostMetaNative.cpp b/core/plugin/processor/inner/ProcessorHostMetaNative.cpp index 1b8b845e40..c64016a4d0 100644 --- a/core/plugin/processor/inner/ProcessorHostMetaNative.cpp +++ b/core/plugin/processor/inner/ProcessorHostMetaNative.cpp @@ -39,9 +39,8 @@ bool ProcessorHostMetaNative::Init(const Json::Value& config) { if (hostType == DEFAULT_ENV_VALUE_ECS) { oss << DEFAULT_CONTENT_VALUE_DOMAIN_ACS << "." << DEFAULT_ENV_VALUE_ECS << "." << DEFAULT_CONTENT_VALUE_ENTITY_TYPE_PROCESS; - ECSMeta ecsMeta = FetchECSMeta(); mDomain = DEFAULT_CONTENT_VALUE_DOMAIN_ACS; - mHostEntityID = ecsMeta.instanceID; + mHostEntityID = FetchHostId(); } else { oss << DEFAULT_CONTENT_VALUE_DOMAIN_INFRA << "." << DEFAULT_ENV_VALUE_HOST << "." << DEFAULT_CONTENT_VALUE_ENTITY_TYPE_PROCESS; @@ -86,8 +85,7 @@ void ProcessorHostMetaNative::ProcessEvent(PipelineEventGroup& group, targetEvent->SetContent(DEFAULT_CONTENT_KEY_DOMAIN, mDomain); targetEvent->SetContent(DEFAULT_CONTENT_KEY_FIRST_OBSERVED_TIME, sourceEvent.GetContent(DEFAULT_CONTENT_KEY_PROCESS_CREATE_TIME)); - targetEvent->SetContent(DEFAULT_CONTENT_KEY_LAST_OBSERVED_TIME, - group.GetMetadata(EventGroupMetaKey::HOST_MONITOR_COLLECT_TIME)); + targetEvent->SetContent(DEFAULT_CONTENT_KEY_LAST_OBSERVED_TIME, group.GetMetadata(EventGroupMetaKey::COLLECT_TIME)); targetEvent->SetContent(DEFAULT_CONTENT_KEY_KEEP_ALIVE_SECONDS, "30"); // TODO: support delete event targetEvent->SetContent(DEFAULT_CONTENT_KEY_METHOD, DEFAULT_CONTENT_VALUE_METHOD_UPDATE); diff --git a/core/unittest/host_monitor/CMakeLists.txt b/core/unittest/host_monitor/CMakeLists.txt index 7a412f8e81..fd761c8c74 100644 --- a/core/unittest/host_monitor/CMakeLists.txt +++ b/core/unittest/host_monitor/CMakeLists.txt @@ -15,9 +15,6 @@ cmake_minimum_required(VERSION 3.22) project(host_monitor_unittest) -add_executable(collector_manager_unittest CollectorManagerUnittest.cpp) -target_link_libraries(collector_manager_unittest ${UT_BASE_TARGET}) - add_executable(process_collector_unittest ProcessCollectorUnittest.cpp) target_link_libraries(process_collector_unittest ${UT_BASE_TARGET}) @@ -32,7 +29,6 @@ file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/1/ DESTINATION ${CMAKE_CURRENT_BINARY_DIR} file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/stat DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) include(GoogleTest) -gtest_discover_tests(collector_manager_unittest) gtest_discover_tests(process_collector_unittest) gtest_discover_tests(host_monitor_input_runner_unittest) gtest_discover_tests(system_information_tools_unittest) diff --git a/core/unittest/host_monitor/CollectorManagerUnittest.cpp b/core/unittest/host_monitor/CollectorManagerUnittest.cpp deleted file mode 100644 index 595be90ded..0000000000 --- a/core/unittest/host_monitor/CollectorManagerUnittest.cpp +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2024 iLogtail Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "CollectorManager.h" -#include "unittest/Unittest.h" - -using namespace std; - -namespace logtail { - -class CollectorManagerUnittest : public testing::Test { -public: - void TestGetCollector() const; -}; - -void CollectorManagerUnittest::TestGetCollector() const { - { - auto collector1 = CollectorManager::GetInstance()->GetCollector("mock"); - APSARA_TEST_NOT_EQUAL_FATAL(nullptr, collector1); - APSARA_TEST_EQUAL_FATAL("mock", collector1->GetName()); - auto collector2 = CollectorManager::GetInstance()->GetCollector("mock"); - APSARA_TEST_EQUAL_FATAL(collector1, collector2); - } -} - - -UNIT_TEST_CASE(CollectorManagerUnittest, TestGetCollector); - -} // namespace logtail - -UNIT_TEST_MAIN diff --git a/core/unittest/host_monitor/HostMonitorInputRunnerUnittest.cpp b/core/unittest/host_monitor/HostMonitorInputRunnerUnittest.cpp index 76946515fd..b2345bc113 100644 --- a/core/unittest/host_monitor/HostMonitorInputRunnerUnittest.cpp +++ b/core/unittest/host_monitor/HostMonitorInputRunnerUnittest.cpp @@ -12,14 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include "HostMonitorInputRunner.h" -#include "Logger.h" +#include "HostMonitorTimerEvent.h" #include "ProcessQueueItem.h" #include "ProcessQueueManager.h" #include "QueueKey.h" #include "QueueKeyManager.h" +#include "common/timer/Timer.h" #include "unittest/Unittest.h" using namespace std; @@ -35,7 +37,7 @@ class HostMonitorInputRunnerUnittest : public testing::Test { void HostMonitorInputRunnerUnittest::TestUpdateAndRemoveCollector() const { auto runner = HostMonitorInputRunner::GetInstance(); runner->Init(); - runner->UpdateCollector("test", {"mock"}, QueueKey{}); + runner->UpdateCollector("test", {"mock"}, QueueKey{}, 0); APSARA_TEST_TRUE_FATAL(runner->IsCollectTaskValid("test", "mock")); APSARA_TEST_TRUE_FATAL(runner->HasRegisteredPlugins()); runner->RemoveCollector("test"); @@ -46,17 +48,17 @@ void HostMonitorInputRunnerUnittest::TestUpdateAndRemoveCollector() const { void HostMonitorInputRunnerUnittest::TestScheduleOnce() const { auto runner = HostMonitorInputRunner::GetInstance(); - runner->mTimer = std::make_shared(); runner->Init(); - runner->mThreadPool->Start(); + runner->mThreadPool.Start(); std::string configName = "test"; auto queueKey = QueueKeyManager::GetInstance()->GetKey(configName); auto ctx = PipelineContext(); ctx.SetConfigName(configName); ProcessQueueManager::GetInstance()->CreateOrUpdateBoundedQueue(queueKey, 0, ctx); - HostMonitorTimerEvent event(std::chrono::steady_clock::now(), 15, configName, "mock", queueKey); - runner->ScheduleOnce(&event); + auto collectConfig = std::make_unique( + configName, "process", queueKey, 0, std::chrono::seconds(1)); + runner->ScheduleOnce(std::move(collectConfig)); std::this_thread::sleep_for(std::chrono::seconds(1)); auto item = std::unique_ptr(new ProcessQueueItem(std::make_shared(), 0)); ProcessQueueManager::GetInstance()->EnablePop(configName); @@ -65,8 +67,8 @@ void HostMonitorInputRunnerUnittest::TestScheduleOnce() const { APSARA_TEST_TRUE_FATAL(item->mEventGroup.GetEvents().size() == 1); // verify schdule next - APSARA_TEST_EQUAL_FATAL(runner->mTimer->mQueue.size(), 1); - runner->mThreadPool->Stop(); + APSARA_TEST_EQUAL_FATAL(Timer::GetInstance()->mQueue.size(), 1); + runner->mThreadPool.Stop(); runner->Stop(); } diff --git a/core/unittest/processor/ProcessorHostMetaNativeUnittest.cpp b/core/unittest/processor/ProcessorHostMetaNativeUnittest.cpp index cc60ecda1b..1d00798a32 100644 --- a/core/unittest/processor/ProcessorHostMetaNativeUnittest.cpp +++ b/core/unittest/processor/ProcessorHostMetaNativeUnittest.cpp @@ -68,7 +68,7 @@ void ProcessorHostMetaNativeUnittest::TestProcess() { Json::Value config; auto sourceBuffer = std::make_shared(); PipelineEventGroup eventGroup(sourceBuffer); - eventGroup.SetMetadataNoCopy(EventGroupMetaKey::HOST_MONITOR_COLLECT_TIME, "123456"); + eventGroup.SetMetadataNoCopy(EventGroupMetaKey::COLLECT_TIME, "123456"); auto event = eventGroup.AddLogEvent(); event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_PID, "123"); event->SetContent(DEFAULT_CONTENT_KEY_PROCESS_CREATE_TIME, "123456");