From 2e8f5cf73c01c0a027fb143dca9352b6bbdfcdb2 Mon Sep 17 00:00:00 2001 From: zhushunjia Date: Wed, 13 Nov 2024 16:49:25 +0800 Subject: [PATCH 1/3] provider support external readmetrics --- core/monitor/MetricExportor.cpp | 21 +++++++++++++----- core/monitor/MetricManager.cpp | 1 + core/monitor/MetricManager.h | 22 +++++++++++++++---- core/monitor/profile_sender/ProfileSender.h | 8 ++++--- core/provider/CMakeLists.txt | 4 ++-- core/provider/Provider.cpp | 4 ++++ core/provider/Provider.h | 4 ++++ .../monitor/MetricManagerUnittest.cpp | 17 +++++++------- core/unittest/provider/ProviderUnittest.cpp | 10 +++++++++ 9 files changed, 68 insertions(+), 23 deletions(-) diff --git a/core/monitor/MetricExportor.cpp b/core/monitor/MetricExportor.cpp index 2440fe2106..635b692eb3 100644 --- a/core/monitor/MetricExportor.cpp +++ b/core/monitor/MetricExportor.cpp @@ -25,6 +25,7 @@ #include "common/TimeUtil.h" #include "go_pipeline/LogtailPlugin.h" #include "pipeline/PipelineManager.h" +#include "provider/Provider.h" #include "protobuf/sls/sls_logs.pb.h" using namespace sls_logs; @@ -51,7 +52,7 @@ void MetricExportor::PushMetrics(bool forceSend) { } // go指标在Cpp指标前获取,是为了在 Cpp 部分指标做 SnapShot - // 前(即调用 ReadMetrics::GetInstance()->UpdateMetrics() 函数),把go部分的进程级指标填写到 Cpp + // 前(即调用 GetReadMetrics()->UpdateMetrics() 函数),把go部分的进程级指标填写到 Cpp // 的进程级指标中去,随Cpp的进程级指标一起输出 if (LogtailPlugin::GetInstance()->IsPluginOpened()) { PushGoMetrics(); @@ -60,17 +61,21 @@ void MetricExportor::PushMetrics(bool forceSend) { } void MetricExportor::PushCppMetrics() { - ReadMetrics::GetInstance()->UpdateMetrics(); + GetReadMetrics()->UpdateMetrics(); if ("sls" == STRING_FLAG(metrics_report_method)) { std::map logGroupMap; - ReadMetrics::GetInstance()->ReadAsLogGroup(METRIC_LABEL_KEY_REGION, METRIC_REGION_DEFAULT, logGroupMap); + GetReadMetrics()->ReadAsLogGroup(METRIC_LABEL_KEY_REGION, METRIC_REGION_DEFAULT, logGroupMap); SendToSLS(logGroupMap); } else if ("file" == STRING_FLAG(metrics_report_method)) { std::string metricsContent; - ReadMetrics::GetInstance()->ReadAsFileBuffer(metricsContent); + GetReadMetrics()->ReadAsFileBuffer(metricsContent); SendToLocalFile(metricsContent, "self-metrics-cpp"); - } + } else if ("custom" == STRING_FLAG(metrics_report_method)) { + std::string metricsContent; + GetReadMetrics()->ReadAsCustomizedProtocol(metricsContent); + GetProfileSender()->SendMetricContent(metricsContent); + } } void MetricExportor::PushGoMetrics() { @@ -162,7 +167,11 @@ void MetricExportor::PushGoDirectMetrics(std::vectorReadAsCustomizedProtocol(metricsContent); + GetProfileSender()->SendMetricContent(metricsContent); + } } // metrics from Go that are provided by cpp diff --git a/core/monitor/MetricManager.cpp b/core/monitor/MetricManager.cpp index 4b0dada653..c415bbeae8 100644 --- a/core/monitor/MetricManager.cpp +++ b/core/monitor/MetricManager.cpp @@ -309,6 +309,7 @@ void ReadMetrics::UpdateMetrics() { } } + MetricsRecord* ReadMetrics::GetHead() { WriteLock lock(mReadWriteLock); return mHead; diff --git a/core/monitor/MetricManager.h b/core/monitor/MetricManager.h index 1d54b1cfce..1149343034 100644 --- a/core/monitor/MetricManager.h +++ b/core/monitor/MetricManager.h @@ -62,15 +62,15 @@ class WriteMetrics { }; class ReadMetrics { -private: - ReadMetrics() = default; +protected: + ReadMetrics() = default; mutable ReadWriteLock mReadWriteLock; MetricsRecord* mHead = nullptr; void Clear(); MetricsRecord* GetHead(); public: - ~ReadMetrics(); + virtual ~ReadMetrics(); static ReadMetrics* GetInstance() { static ReadMetrics* ptr = new ReadMetrics(); return ptr; @@ -78,9 +78,23 @@ class ReadMetrics { void ReadAsLogGroup(const std::string& regionFieldName, const std::string& defaultRegion, std::map& logGroupMap) const; + void ReadAsFileBuffer(std::string& metricsContent) const; - void UpdateMetrics(); + + + virtual void ReadAsLogGroup(const std::string& regionFieldName, + const std::string& defaultRegion, + std::unordered_map& logGroupMap) const; + + // serlesialize metrics to other format + virtual void SerializeMetricsToString(std::vector>& metricsList, + std::string& metricsContent) const {} + + // read metrics to other format + virtual void ReadAsCustomizedProtocol(std::string& metricsContent) const {} + + void UpdateMetrics(); #ifdef APSARA_UNIT_TEST_MAIN friend class MetricManagerUnittest; #endif diff --git a/core/monitor/profile_sender/ProfileSender.h b/core/monitor/profile_sender/ProfileSender.h index fc91d22233..ae5e9a520d 100644 --- a/core/monitor/profile_sender/ProfileSender.h +++ b/core/monitor/profile_sender/ProfileSender.h @@ -31,6 +31,8 @@ class ProfileSender { ProfileSender(const ProfileSender&) = delete; ProfileSender& operator=(const ProfileSender&) = delete; + virtual ~ProfileSender() = default; + static ProfileSender* GetInstance(); void SetDefaultProfileRegion(const std::string& profileRegion); @@ -46,10 +48,10 @@ class ProfileSender { bool IsProfileData(const std::string& region, const std::string& project, const std::string& logstore); virtual void SendToProfileProject(const std::string& region, sls_logs::LogGroup& logGroup); - + + virtual void SendMetricContent(const std::string& content) {} protected: - ProfileSender(); - ~ProfileSender() = default; + ProfileSender(); FlusherSLS* GetFlusher(const std::string& region); diff --git a/core/provider/CMakeLists.txt b/core/provider/CMakeLists.txt index 5c3cccbb50..297c6019cf 100644 --- a/core/provider/CMakeLists.txt +++ b/core/provider/CMakeLists.txt @@ -17,8 +17,8 @@ project(provider) file(GLOB LIB_SOURCE_FILES *.cpp *.h) -set(PROVIDER_SUB_DIRECTORIES_LIST - monitor/profile_sender config/feedbacker config/provider config/common_provider protobuf/config_server/v1 protobuf/config_server/v2 +set(PROVIDER_SUB_DIRECTORIES_LIST + monitor monitor/profile_sender config/feedbacker config/provider config/common_provider protobuf/config_server/v1 protobuf/config_server/v2 ) foreach(DIR_NAME IN LISTS PROVIDER_SUB_DIRECTORIES_LIST) diff --git a/core/provider/Provider.cpp b/core/provider/Provider.cpp index 52523b256c..c5f6b80f9a 100644 --- a/core/provider/Provider.cpp +++ b/core/provider/Provider.cpp @@ -34,6 +34,10 @@ void InitRemoteConfigProviders() { LegacyCommonConfigProvider::GetInstance()->Init("common"); } +ReadMetrics* GetReadMetrics() { + return ReadMetrics::GetInstance(); +} + ProfileSender* GetProfileSender() { return ProfileSender::GetInstance(); } diff --git a/core/provider/Provider.h b/core/provider/Provider.h index 90712baf78..34fc577d5b 100644 --- a/core/provider/Provider.h +++ b/core/provider/Provider.h @@ -16,6 +16,7 @@ #pragma once +#include "monitor/MetricManager.h" #include "config/provider/ConfigProvider.h" #include "monitor/profile_sender/ProfileSender.h" @@ -32,6 +33,9 @@ std::vector GetRemoteConfigProviders(); // It currently initializes the LegacyCommonConfigProvider and CommonConfigProvider. void InitRemoteConfigProviders(); +// GetReadMetrics returns the ReadMetrics instance. +ReadMetrics* GetReadMetrics(); + // GetProfileSender returns the ProfileSender instance. ProfileSender* GetProfileSender(); } // namespace logtail \ No newline at end of file diff --git a/core/unittest/monitor/MetricManagerUnittest.cpp b/core/unittest/monitor/MetricManagerUnittest.cpp index 3625105227..86f17b8ddd 100644 --- a/core/unittest/monitor/MetricManagerUnittest.cpp +++ b/core/unittest/monitor/MetricManagerUnittest.cpp @@ -21,6 +21,7 @@ #include "MetricManager.h" #include "MetricExportor.h" #include "MetricConstants.h" +#include "provider/Provider.h" namespace logtail { @@ -33,7 +34,7 @@ class MetricManagerUnittest : public ::testing::Test { void SetUp() {} void TearDown() { - ReadMetrics::GetInstance()->Clear(); + GetReadMetrics()->Clear(); WriteMetrics::GetInstance()->Clear(); } @@ -76,7 +77,7 @@ void MetricManagerUnittest::TestCreateMetricAutoDelete() { // assert ReadMetrics count - tmp = ReadMetrics::GetInstance()->GetHead(); + tmp = GetReadMetrics()->GetHead(); count = 0; while (tmp) { tmp = tmp->GetNext(); @@ -121,7 +122,7 @@ void MetricManagerUnittest::TestCreateMetricAutoDelete() { // assert ReadMetrics count - tmp = ReadMetrics::GetInstance()->GetHead(); + tmp = GetReadMetrics()->GetHead(); count = 0; while (tmp) { tmp = tmp->GetNext(); @@ -189,7 +190,7 @@ void MetricManagerUnittest::TestCreateMetricAutoDeleteMultiThread() { APSARA_TEST_EQUAL(count, 0); // assert ReadMetrics count - tmp = ReadMetrics::GetInstance()->GetHead(); + tmp = GetReadMetrics()->GetHead(); count = 0; while (tmp) { tmp = tmp->GetNext(); @@ -281,7 +282,7 @@ void MetricManagerUnittest::TestCreateAndDeleteMetric() { } // assert ReadMetrics count - tmp = ReadMetrics::GetInstance()->GetHead(); + tmp = GetReadMetrics()->GetHead(); count = 0; while (tmp) { tmp = tmp->GetNext(); @@ -291,7 +292,7 @@ void MetricManagerUnittest::TestCreateAndDeleteMetric() { // assert readMetric value if (count == 1) { - tmp = ReadMetrics::GetInstance()->GetHead(); + tmp = GetReadMetrics()->GetHead(); std::vector values = tmp->GetCounters(); APSARA_TEST_EQUAL(values.size(), 1); if (values.size() == 1) { @@ -308,7 +309,7 @@ void MetricManagerUnittest::TestCreateAndDeleteMetric() { MetricExportor::GetInstance()->PushMetrics(true); // assert ReadMetrics count - tmp = ReadMetrics::GetInstance()->GetHead(); + tmp = GetReadMetrics()->GetHead(); count = 0; while (tmp) { tmp = tmp->GetNext(); @@ -318,7 +319,7 @@ void MetricManagerUnittest::TestCreateAndDeleteMetric() { // assert readMetric value if (count == 1) { - tmp = ReadMetrics::GetInstance()->GetHead(); + tmp = GetReadMetrics()->GetHead(); std::vector values = tmp->GetCounters(); APSARA_TEST_EQUAL(values.size(), 1); if (values.size() == 1) { diff --git a/core/unittest/provider/ProviderUnittest.cpp b/core/unittest/provider/ProviderUnittest.cpp index 9eb039db4c..aef13256c8 100644 --- a/core/unittest/provider/ProviderUnittest.cpp +++ b/core/unittest/provider/ProviderUnittest.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "monitor/MetricManager.h" #include "unittest/Unittest.h" #include "provider/Provider.h" @@ -22,6 +23,7 @@ class ProviderUnittest : public testing::Test { public: void TestGetRemoteConfigProvider(); void TestGetProfileSender(); + void TestGetReadMetrics(); }; void ProviderUnittest::TestGetRemoteConfigProvider() { @@ -29,6 +31,13 @@ void ProviderUnittest::TestGetRemoteConfigProvider() { APSARA_TEST_GT(remoteConfigProviders.size(), 0U); } +void ProviderUnittest::TestGetReadMetrics() { + auto readMetrics = GetReadMetrics(); + auto expected = ReadMetrics::GetInstance(); + APSARA_TEST_NOT_EQUAL(nullptr, readMetrics); + APSARA_TEST_EQUAL(expected, readMetrics); +} + void ProviderUnittest::TestGetProfileSender() { auto profileSender = GetProfileSender(); APSARA_TEST_NOT_EQUAL(nullptr, profileSender); @@ -37,6 +46,7 @@ void ProviderUnittest::TestGetProfileSender() { UNIT_TEST_CASE(ProviderUnittest, TestGetRemoteConfigProvider) UNIT_TEST_CASE(ProviderUnittest, TestGetProfileSender) +UNIT_TEST_CASE(ProviderUnittest, TestGetReadMetrics) } // namespace logtail UNIT_TEST_MAIN \ No newline at end of file From 3f066043e0199d215dd81d9ec2692695a4935246 Mon Sep 17 00:00:00 2001 From: zhushunjia Date: Wed, 13 Nov 2024 17:38:17 +0800 Subject: [PATCH 2/3] remove duplicate func definition --- core/monitor/MetricManager.h | 13 ++++--------- core/provider/CMakeLists.txt | 3 ++- core/provider/Provider.h | 1 + 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/core/monitor/MetricManager.h b/core/monitor/MetricManager.h index 1149343034..c725678403 100644 --- a/core/monitor/MetricManager.h +++ b/core/monitor/MetricManager.h @@ -63,14 +63,14 @@ class WriteMetrics { class ReadMetrics { protected: - ReadMetrics() = default; + ReadMetrics() = default; mutable ReadWriteLock mReadWriteLock; MetricsRecord* mHead = nullptr; void Clear(); MetricsRecord* GetHead(); public: - virtual ~ReadMetrics(); + virtual ~ReadMetrics(); static ReadMetrics* GetInstance() { static ReadMetrics* ptr = new ReadMetrics(); return ptr; @@ -81,17 +81,12 @@ class ReadMetrics { void ReadAsFileBuffer(std::string& metricsContent) const; - - virtual void ReadAsLogGroup(const std::string& regionFieldName, - const std::string& defaultRegion, - std::unordered_map& logGroupMap) const; - - // serlesialize metrics to other format + // serialize input metrics to metricsContent virtual void SerializeMetricsToString(std::vector>& metricsList, std::string& metricsContent) const {} - // read metrics to other format + // iterate through the metrics list to serialize into metricsContent virtual void ReadAsCustomizedProtocol(std::string& metricsContent) const {} void UpdateMetrics(); diff --git a/core/provider/CMakeLists.txt b/core/provider/CMakeLists.txt index 297c6019cf..22ec399fe2 100644 --- a/core/provider/CMakeLists.txt +++ b/core/provider/CMakeLists.txt @@ -18,7 +18,8 @@ project(provider) file(GLOB LIB_SOURCE_FILES *.cpp *.h) set(PROVIDER_SUB_DIRECTORIES_LIST - monitor monitor/profile_sender config/feedbacker config/provider config/common_provider protobuf/config_server/v1 protobuf/config_server/v2 + monitor monitor/profile_sender + config/feedbacker config/provider config/common_provider protobuf/config_server/v1 protobuf/config_server/v2 ) foreach(DIR_NAME IN LISTS PROVIDER_SUB_DIRECTORIES_LIST) diff --git a/core/provider/Provider.h b/core/provider/Provider.h index 34fc577d5b..c70b93ff62 100644 --- a/core/provider/Provider.h +++ b/core/provider/Provider.h @@ -34,6 +34,7 @@ std::vector GetRemoteConfigProviders(); void InitRemoteConfigProviders(); // GetReadMetrics returns the ReadMetrics instance. +// It currently can marshal metrics to sls and file format. ReadMetrics* GetReadMetrics(); // GetProfileSender returns the ProfileSender instance. From 1610281f7f92fc97dae3647a92f1d9150b144803 Mon Sep 17 00:00:00 2001 From: zhushunjia Date: Wed, 13 Nov 2024 21:16:50 +0800 Subject: [PATCH 3/3] fix PushGoDirectMetrics custom marshal --- core/monitor/MetricExportor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/monitor/MetricExportor.cpp b/core/monitor/MetricExportor.cpp index 635b692eb3..0767a5e5a1 100644 --- a/core/monitor/MetricExportor.cpp +++ b/core/monitor/MetricExportor.cpp @@ -169,7 +169,7 @@ void MetricExportor::PushGoDirectMetrics(std::vectorReadAsCustomizedProtocol(metricsContent); + GetReadMetrics()->SerializeMetricsToString(metricsList, metricsContent); GetProfileSender()->SendMetricContent(metricsContent); } }