Skip to content

Commit

Permalink
Implement periodic exporting metric reader (#1286)
Browse files Browse the repository at this point in the history
  • Loading branch information
lalitb committed Apr 1, 2022
1 parent 2034c9b commit 33d9c62
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 15 deletions.
6 changes: 4 additions & 2 deletions exporters/ostream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ cc_library(
],
)

#TODO MetricData is still changing, uncomment once it is final
# TODO - Uncomment once MetricData interface is finalised
#cc_library(
# name = "ostream_metric_exporter",
# srcs = [
# "src/metric_exporter.cc",
# ],
Expand All @@ -70,7 +72,7 @@ cc_library(
# deps = [
# ":ostream_metric_exporter",
# "@com_google_googletest//:gtest_main",
#],
# ],
#)

cc_test(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once
#ifndef ENABLE_METRICS_PREVIEW

# include "opentelemetry/sdk/metrics/metric_reader.h"
# include "opentelemetry/version.h"

# include <atomic>
# include <chrono>
# include <condition_variable>
# include <thread>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

class MetricExporter;
/**
* Struct to hold PeriodicExortingMetricReader options.
*/

constexpr std::chrono::milliseconds kExportIntervalMillis = std::chrono::milliseconds(60000);
constexpr std::chrono::milliseconds kExportTimeOutMillis = std::chrono::milliseconds(30000);
struct PeriodicExportingMetricReaderOptions
{

/* The time interval between two consecutive exports. */
std::chrono::milliseconds export_interval_millis =
std::chrono::milliseconds(kExportIntervalMillis);

/* how long the export can run before it is cancelled. */
std::chrono::milliseconds export_timeout_millis = std::chrono::milliseconds(kExportTimeOutMillis);
};

class PeriodicExportingMetricReader : public MetricReader
{

public:
PeriodicExportingMetricReader(
std::unique_ptr<MetricExporter> exporter,
const PeriodicExportingMetricReaderOptions &option,
AggregationTemporality aggregation_temporality = AggregationTemporality::kCumulative);

private:
bool OnForceFlush(std::chrono::microseconds timeout) noexcept override;

bool OnShutDown(std::chrono::microseconds timeout) noexcept override;

void OnInitialized() noexcept override;

std::unique_ptr<MetricExporter> exporter_;
std::chrono::milliseconds export_interval_millis_;
std::chrono::milliseconds export_timeout_millis_;

void DoBackgroundWork();

/* The background worker thread */
std::thread worker_thread_;

/* Synchronization primitives */
std::condition_variable cv_;
std::mutex cv_m_;
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
#endif
9 changes: 2 additions & 7 deletions sdk/include/opentelemetry/sdk/metrics/metric_exporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@ class MetricExporter
/**
* Exports a batch of metrics recordables. This method must not be called
* concurrently for the same exporter instance.
* @param spans a span of unique pointers to metrics data
* @param data metrics data
*/
virtual opentelemetry::sdk::common::ExportResult Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::metrics::MetricData>>
&records) noexcept = 0;
virtual opentelemetry::sdk::common::ExportResult Export(const MetricData &data) noexcept = 0;

/**
* Force flush the exporter.
Expand All @@ -49,9 +47,6 @@ class MetricExporter
*/
virtual bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0;

private:
AggregationTemporality aggregation_temporality_;
};
} // namespace metrics
} // namespace sdk
Expand Down
1 change: 1 addition & 0 deletions sdk/include/opentelemetry/sdk/metrics/metric_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class MetricReader

virtual void OnInitialized() noexcept {}

protected:
bool IsShutdown() const noexcept;

private:
Expand Down
1 change: 1 addition & 0 deletions sdk/src/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ add_library(
meter.cc
meter_context.cc
metric_reader.cc
export/periodic_exporting_metric_reader.cc
state/metric_collector.cc
state/sync_metric_storage.cc
aggregation/histogram_aggregation.cc
Expand Down
101 changes: 101 additions & 0 deletions sdk/src/metrics/export/periodic_exporting_metric_reader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#ifndef ENABLE_METRICS_PREVIEW
# include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h"
# include "opentelemetry/sdk/common/global_log_handler.h"
# include "opentelemetry/sdk/metrics/metric_exporter.h"

# include <chrono>
# include <future>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

PeriodicExportingMetricReader::PeriodicExportingMetricReader(
std::unique_ptr<MetricExporter> exporter,
const PeriodicExportingMetricReaderOptions &option,
AggregationTemporality aggregation_temporality)
: MetricReader(aggregation_temporality),
exporter_{std::move(exporter)},
export_interval_millis_{option.export_interval_millis},
export_timeout_millis_{option.export_timeout_millis}
{
if (export_interval_millis_ <= export_timeout_millis_)
{
OTEL_INTERNAL_LOG_WARN(
"[Periodic Exporting Metric Reader] Invalid configuration: "
"export_interval_millis_ should be less than export_timeout_millis_, using default values");
export_interval_millis_ = kExportIntervalMillis;
export_timeout_millis_ = kExportTimeOutMillis;
}
}

void PeriodicExportingMetricReader::OnInitialized() noexcept
{
worker_thread_ = std::thread(&PeriodicExportingMetricReader::DoBackgroundWork, this);
}

void PeriodicExportingMetricReader::DoBackgroundWork()
{
std::unique_lock<std::mutex> lk(cv_m_);
do
{
if (IsShutdown())
{
break;
}
std::atomic<bool> cancel_export_for_timeout{false};
auto start = std::chrono::steady_clock::now();
auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] {
Collect([this, &cancel_export_for_timeout](MetricData data) {
if (cancel_export_for_timeout)
{
OTEL_INTERNAL_LOG_ERROR(
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
<< export_timeout_millis_.count() << " ms, and timed out");
return false;
}
this->exporter_->Export(data);
return true;
});
});
std::future_status status;
do
{
status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_));
if (status == std::future_status::timeout)
{
cancel_export_for_timeout = true;
break;
}
} while (status != std::future_status::ready);
auto end = std::chrono::steady_clock::now();
auto export_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms;
cv_.wait_for(lk, remaining_wait_interval_ms);
} while (true);
}

bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept
{
return exporter_->ForceFlush(timeout);
}

bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept
{
if (worker_thread_.joinable())
{
cv_.notify_one();
worker_thread_.join();
}
return exporter_->Shutdown(timeout);
}

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
#endif
10 changes: 7 additions & 3 deletions sdk/src/metrics/metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ MetricReader::MetricReader(AggregationTemporality aggregation_temporality)
void MetricReader::SetMetricProducer(MetricProducer *metric_producer)
{
metric_producer_ = metric_producer;
OnInitialized();
}

AggregationTemporality MetricReader::GetAggregationTemporality() const noexcept
Expand All @@ -46,18 +47,21 @@ bool MetricReader::Collect(nostd::function_ref<bool(MetricData)> callback) noexc
bool MetricReader::Shutdown(std::chrono::microseconds timeout) noexcept
{
bool status = true;

if (IsShutdown())
{
OTEL_INTERNAL_LOG_WARN("MetricReader::Shutdown - Cannot invoke shutdown twice!");
}

{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
shutdown_ = true;
}

if (!OnShutDown(timeout))
{
status = false;
OTEL_INTERNAL_LOG_WARN("MetricReader::OnShutDown Shutdown failed. Will not be tried again!");
}
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
shutdown_ = true;
return status;
}

Expand Down
3 changes: 2 additions & 1 deletion sdk/test/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ foreach(
observer_result_test
sync_instruments_test
async_instruments_test
metric_reader_test)
metric_reader_test
periodic_exporting_metric_reader_test)
add_executable(${testname} "${testname}.cc")
target_link_libraries(
${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}
Expand Down
3 changes: 1 addition & 2 deletions sdk/test/metrics/meter_provider_sdk_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ class MockMetricExporter : public MetricExporter

public:
MockMetricExporter() = default;
opentelemetry::sdk::common::ExportResult Export(
const opentelemetry::nostd::span<std::unique_ptr<MetricData>> &records) noexcept override
opentelemetry::sdk::common::ExportResult Export(const MetricData &records) noexcept override
{
return opentelemetry::sdk::common::ExportResult::kSuccess;
}
Expand Down
81 changes: 81 additions & 0 deletions sdk/test/metrics/periodic_exporting_metric_reader_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#ifndef ENABLE_METRICS_PREVIEW

# include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h"
# include "opentelemetry/sdk/metrics/export/metric_producer.h"
# include "opentelemetry/sdk/metrics/metric_exporter.h"

# include <gtest/gtest.h>

using namespace opentelemetry;
using namespace opentelemetry::sdk::instrumentationlibrary;
using namespace opentelemetry::sdk::metrics;

class MockPushMetricExporter : public MetricExporter
{
public:
opentelemetry::sdk::common::ExportResult Export(const MetricData &record) noexcept override
{
records_.push_back(record);
return opentelemetry::sdk::common::ExportResult::kSuccess;
}

bool ForceFlush(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override
{
return false;
}

bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override
{
return true;
}

size_t GetDataCount() { return records_.size(); }

private:
std::vector<MetricData> records_;
};

class MockMetricProducer : public MetricProducer
{
public:
MockMetricProducer(std::chrono::microseconds sleep_ms = std::chrono::microseconds::zero())
: sleep_ms_{sleep_ms}, data_sent_size_(0)
{}

bool Collect(nostd::function_ref<bool(MetricData)> callback) noexcept override
{
std::this_thread::sleep_for(sleep_ms_);
data_sent_size_++;
MetricData data;
callback(data);
return true;
}

size_t GetDataCount() { return data_sent_size_; }

private:
std::chrono::microseconds sleep_ms_;
size_t data_sent_size_;
};

TEST(PeriodicExporingMetricReader, BasicTests)
{
std::unique_ptr<MetricExporter> exporter(new MockPushMetricExporter());
PeriodicExportingMetricReaderOptions options;
options.export_timeout_millis = std::chrono::milliseconds(200);
options.export_interval_millis = std::chrono::milliseconds(500);
auto exporter_ptr = exporter.get();
PeriodicExportingMetricReader reader(std::move(exporter), options);
MockMetricProducer producer;
reader.SetMetricProducer(&producer);
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
reader.Shutdown();
EXPECT_EQ(static_cast<MockPushMetricExporter *>(exporter_ptr)->GetDataCount(),
static_cast<MockMetricProducer *>(&producer)->GetDataCount());
}

#endif

1 comment on commit 33d9c62

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'OpenTelemetry-cpp sdk Benchmark'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 33d9c62 Previous: 2034c9b Ratio
BM_BaselineBuffer/1 18955085.277557373 ns/iter 470928.69069080986 ns/iter 40.25
BM_LockFreeBuffer/1 3767801.7616271973 ns/iter 343715.10234399297 ns/iter 10.96

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.