Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: provider add GetReadMetrics API #1885

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions core/monitor/MetricExportor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "common/TimeUtil.h"
#include "go_pipeline/LogtailPlugin.h"
#include "pipeline/PipelineManager.h"
#include "provider/Provider.h"
#include "protobuf/sls/sls_logs.pb.h"

using namespace sls_logs;
Expand All @@ -51,7 +52,7 @@ void MetricExportor::PushMetrics(bool forceSend) {
}

// go指标在Cpp指标前获取,是为了在 Cpp 部分指标做 SnapShot
// 前(即调用 ReadMetrics::GetInstance()->UpdateMetrics() 函数),把go部分的进程级指标填写到 Cpp
// 前(即调用 GetReadMetrics()->UpdateMetrics() 函数),把go部分的进程级指标填写到 Cpp
// 的进程级指标中去,随Cpp的进程级指标一起输出
if (LogtailPlugin::GetInstance()->IsPluginOpened()) {
PushGoMetrics();
Expand All @@ -60,17 +61,21 @@ void MetricExportor::PushMetrics(bool forceSend) {
}

void MetricExportor::PushCppMetrics() {
ReadMetrics::GetInstance()->UpdateMetrics();
GetReadMetrics()->UpdateMetrics();

if ("sls" == STRING_FLAG(metrics_report_method)) {
std::map<std::string, sls_logs::LogGroup*> logGroupMap;
ReadMetrics::GetInstance()->ReadAsLogGroup(METRIC_LABEL_KEY_REGION, METRIC_REGION_DEFAULT, logGroupMap);
GetReadMetrics()->ReadAsLogGroup(METRIC_LABEL_KEY_REGION, METRIC_REGION_DEFAULT, logGroupMap);
SendToSLS(logGroupMap);
} else if ("file" == STRING_FLAG(metrics_report_method)) {
std::string metricsContent;
ReadMetrics::GetInstance()->ReadAsFileBuffer(metricsContent);
GetReadMetrics()->ReadAsFileBuffer(metricsContent);
SendToLocalFile(metricsContent, "self-metrics-cpp");
}
} else if ("custom" == STRING_FLAG(metrics_report_method)) {
std::string metricsContent;
GetReadMetrics()->ReadAsCustomizedProtocol(metricsContent);
GetProfileSender()->SendMetricContent(metricsContent);
}
}

void MetricExportor::PushGoMetrics() {
Expand Down Expand Up @@ -162,7 +167,11 @@ void MetricExportor::PushGoDirectMetrics(std::vector<std::map<std::string, std::
std::string metricsContent;
SerializeGoDirectMetricsListToString(metricsList, metricsContent);
SendToLocalFile(metricsContent, "self-metrics-go");
}
} else if ("custom" == STRING_FLAG(metrics_report_method)) {
std::string metricsContent;
GetReadMetrics()->SerializeMetricsToString(metricsList, metricsContent);
GetProfileSender()->SendMetricContent(metricsContent);
}
}

// metrics from Go that are provided by cpp
Expand Down
1 change: 1 addition & 0 deletions core/monitor/MetricManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ void ReadMetrics::UpdateMetrics() {
}
}


MetricsRecord* ReadMetrics::GetHead() {
WriteLock lock(mReadWriteLock);
return mHead;
Expand Down
15 changes: 12 additions & 3 deletions core/monitor/MetricManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,34 @@ class WriteMetrics {
};

class ReadMetrics {
private:
protected:
ReadMetrics() = default;
mutable ReadWriteLock mReadWriteLock;
MetricsRecord* mHead = nullptr;
void Clear();
MetricsRecord* GetHead();

public:
~ReadMetrics();
virtual ~ReadMetrics();
static ReadMetrics* GetInstance() {
static ReadMetrics* ptr = new ReadMetrics();
return ptr;
}
void ReadAsLogGroup(const std::string& regionFieldName,
const std::string& defaultRegion,
std::map<std::string, sls_logs::LogGroup*>& logGroupMap) const;

void ReadAsFileBuffer(std::string& metricsContent) const;
void UpdateMetrics();


// serialize input metrics to metricsContent
virtual void SerializeMetricsToString(std::vector<std::map<std::string, std::string>>& metricsList,
std::string& metricsContent) const {}

// iterate through the metrics list to serialize into metricsContent
virtual void ReadAsCustomizedProtocol(std::string& metricsContent) const {}

void UpdateMetrics();
#ifdef APSARA_UNIT_TEST_MAIN
friend class MetricManagerUnittest;
#endif
Expand Down
8 changes: 5 additions & 3 deletions core/monitor/profile_sender/ProfileSender.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class ProfileSender {
ProfileSender(const ProfileSender&) = delete;
ProfileSender& operator=(const ProfileSender&) = delete;

virtual ~ProfileSender() = default;

static ProfileSender* GetInstance();

void SetDefaultProfileRegion(const std::string& profileRegion);
Expand All @@ -46,10 +48,10 @@ class ProfileSender {
bool IsProfileData(const std::string& region, const std::string& project, const std::string& logstore);

virtual void SendToProfileProject(const std::string& region, sls_logs::LogGroup& logGroup);


virtual void SendMetricContent(const std::string& content) {}
protected:
ProfileSender();
~ProfileSender() = default;
ProfileSender();

FlusherSLS* GetFlusher(const std::string& region);

Expand Down
5 changes: 3 additions & 2 deletions core/provider/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ project(provider)

file(GLOB LIB_SOURCE_FILES *.cpp *.h)

set(PROVIDER_SUB_DIRECTORIES_LIST
monitor/profile_sender config/feedbacker config/provider config/common_provider protobuf/config_server/v1 protobuf/config_server/v2
set(PROVIDER_SUB_DIRECTORIES_LIST
monitor monitor/profile_sender
config/feedbacker config/provider config/common_provider protobuf/config_server/v1 protobuf/config_server/v2
)

foreach(DIR_NAME IN LISTS PROVIDER_SUB_DIRECTORIES_LIST)
Expand Down
4 changes: 4 additions & 0 deletions core/provider/Provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ void InitRemoteConfigProviders() {
LegacyCommonConfigProvider::GetInstance()->Init("common");
}

ReadMetrics* GetReadMetrics() {
return ReadMetrics::GetInstance();
}

ProfileSender* GetProfileSender() {
return ProfileSender::GetInstance();
}
Expand Down
5 changes: 5 additions & 0 deletions core/provider/Provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include "monitor/MetricManager.h"
#include "config/provider/ConfigProvider.h"
#include "monitor/profile_sender/ProfileSender.h"

Expand All @@ -32,6 +33,10 @@ std::vector<ConfigProvider*> GetRemoteConfigProviders();
// It currently initializes the LegacyCommonConfigProvider and CommonConfigProvider.
void InitRemoteConfigProviders();

// GetReadMetrics returns the ReadMetrics instance.
// It currently can marshal metrics to sls and file format.
ReadMetrics* GetReadMetrics();

// GetProfileSender returns the ProfileSender instance.
ProfileSender* GetProfileSender();
} // namespace logtail
17 changes: 9 additions & 8 deletions core/unittest/monitor/MetricManagerUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "MetricManager.h"
#include "MetricExportor.h"
#include "MetricConstants.h"
#include "provider/Provider.h"

namespace logtail {

Expand All @@ -33,7 +34,7 @@ class MetricManagerUnittest : public ::testing::Test {
void SetUp() {}

void TearDown() {
ReadMetrics::GetInstance()->Clear();
GetReadMetrics()->Clear();
WriteMetrics::GetInstance()->Clear();
}

Expand Down Expand Up @@ -76,7 +77,7 @@ void MetricManagerUnittest::TestCreateMetricAutoDelete() {


// assert ReadMetrics count
tmp = ReadMetrics::GetInstance()->GetHead();
tmp = GetReadMetrics()->GetHead();
count = 0;
while (tmp) {
tmp = tmp->GetNext();
Expand Down Expand Up @@ -121,7 +122,7 @@ void MetricManagerUnittest::TestCreateMetricAutoDelete() {


// assert ReadMetrics count
tmp = ReadMetrics::GetInstance()->GetHead();
tmp = GetReadMetrics()->GetHead();
count = 0;
while (tmp) {
tmp = tmp->GetNext();
Expand Down Expand Up @@ -189,7 +190,7 @@ void MetricManagerUnittest::TestCreateMetricAutoDeleteMultiThread() {
APSARA_TEST_EQUAL(count, 0);

// assert ReadMetrics count
tmp = ReadMetrics::GetInstance()->GetHead();
tmp = GetReadMetrics()->GetHead();
count = 0;
while (tmp) {
tmp = tmp->GetNext();
Expand Down Expand Up @@ -281,7 +282,7 @@ void MetricManagerUnittest::TestCreateAndDeleteMetric() {
}

// assert ReadMetrics count
tmp = ReadMetrics::GetInstance()->GetHead();
tmp = GetReadMetrics()->GetHead();
count = 0;
while (tmp) {
tmp = tmp->GetNext();
Expand All @@ -291,7 +292,7 @@ void MetricManagerUnittest::TestCreateAndDeleteMetric() {

// assert readMetric value
if (count == 1) {
tmp = ReadMetrics::GetInstance()->GetHead();
tmp = GetReadMetrics()->GetHead();
std::vector<CounterPtr> values = tmp->GetCounters();
APSARA_TEST_EQUAL(values.size(), 1);
if (values.size() == 1) {
Expand All @@ -308,7 +309,7 @@ void MetricManagerUnittest::TestCreateAndDeleteMetric() {

MetricExportor::GetInstance()->PushMetrics(true);
// assert ReadMetrics count
tmp = ReadMetrics::GetInstance()->GetHead();
tmp = GetReadMetrics()->GetHead();
count = 0;
while (tmp) {
tmp = tmp->GetNext();
Expand All @@ -318,7 +319,7 @@ void MetricManagerUnittest::TestCreateAndDeleteMetric() {

// assert readMetric value
if (count == 1) {
tmp = ReadMetrics::GetInstance()->GetHead();
tmp = GetReadMetrics()->GetHead();
std::vector<CounterPtr> values = tmp->GetCounters();
APSARA_TEST_EQUAL(values.size(), 1);
if (values.size() == 1) {
Expand Down
10 changes: 10 additions & 0 deletions core/unittest/provider/ProviderUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "monitor/MetricManager.h"
#include "unittest/Unittest.h"
#include "provider/Provider.h"

Expand All @@ -22,13 +23,21 @@ class ProviderUnittest : public testing::Test {
public:
void TestGetRemoteConfigProvider();
void TestGetProfileSender();
void TestGetReadMetrics();

};
void ProviderUnittest::TestGetRemoteConfigProvider() {
auto remoteConfigProviders = GetRemoteConfigProviders();
APSARA_TEST_GT(remoteConfigProviders.size(), 0U);
}

void ProviderUnittest::TestGetReadMetrics() {
auto readMetrics = GetReadMetrics();
auto expected = ReadMetrics::GetInstance();
APSARA_TEST_NOT_EQUAL(nullptr, readMetrics);
APSARA_TEST_EQUAL(expected, readMetrics);
}

void ProviderUnittest::TestGetProfileSender() {
auto profileSender = GetProfileSender();
APSARA_TEST_NOT_EQUAL(nullptr, profileSender);
Expand All @@ -37,6 +46,7 @@ void ProviderUnittest::TestGetProfileSender() {

UNIT_TEST_CASE(ProviderUnittest, TestGetRemoteConfigProvider)
UNIT_TEST_CASE(ProviderUnittest, TestGetProfileSender)
UNIT_TEST_CASE(ProviderUnittest, TestGetReadMetrics)
} // namespace logtail

UNIT_TEST_MAIN
Loading