Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement periodic exporting metric reader #1286

Merged
merged 21 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions exporters/ostream/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ cc_library(
],
)

#TODO MetricData is still changing, uncomment once it is final
# TODO - Uncomment once MetricData interface is finalised
#cc_library(
# name = "ostream_metric_exporter",
# srcs = [
# "src/metric_exporter.cc",
# ],
Expand All @@ -70,7 +72,7 @@ cc_library(
# deps = [
# ":ostream_metric_exporter",
# "@com_google_googletest//:gtest_main",
#],
# ],
#)

cc_test(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once
#ifndef ENABLE_METRICS_PREVIEW

# include "opentelemetry/sdk/metrics/metric_reader.h"
# include "opentelemetry/version.h"

# include <atomic>
# include <chrono>
# include <condition_variable>
# include <thread>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

class MetricExporter;
/**
* Struct to hold PeriodicExortingMetricReader options.
*/

constexpr std::chrono::milliseconds kExportIntervalMillis = std::chrono::milliseconds(60000);
ThomsonTan marked this conversation as resolved.
Show resolved Hide resolved
constexpr std::chrono::milliseconds kExportTimeOutMillis = std::chrono::milliseconds(30000);
struct PeriodicExportingMetricReaderOptions
{

/* The time interval between two consecutive exports. */
std::chrono::milliseconds export_interval_millis =
std::chrono::milliseconds(kExportIntervalMillis);

/* how long the export can run before it is cancelled. */
std::chrono::milliseconds export_timeout_millis = std::chrono::milliseconds(kExportTimeOutMillis);
};

class PeriodicExportingMetricReader : public MetricReader
{

public:
PeriodicExportingMetricReader(
std::unique_ptr<MetricExporter> exporter,
const PeriodicExportingMetricReaderOptions &option,
AggregationTemporality aggregation_temporality = AggregationTemporality::kCumulative);

private:
bool OnForceFlush(std::chrono::microseconds timeout) noexcept override;

bool OnShutDown(std::chrono::microseconds timeout) noexcept override;

void OnInitialized() noexcept override;

std::unique_ptr<MetricExporter> exporter_;
std::chrono::milliseconds export_interval_millis_;
std::chrono::milliseconds export_timeout_millis_;

void DoBackgroundWork();

/* The background worker thread */
std::thread worker_thread_;

/* Synchronization primitives */
std::condition_variable cv_;
std::mutex cv_m_;
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
#endif
9 changes: 2 additions & 7 deletions sdk/include/opentelemetry/sdk/metrics/metric_exporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@ class MetricExporter
/**
* Exports a batch of metrics recordables. This method must not be called
* concurrently for the same exporter instance.
* @param spans a span of unique pointers to metrics data
* @param data metrics data
*/
virtual opentelemetry::sdk::common::ExportResult Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::metrics::MetricData>>
&records) noexcept = 0;
virtual opentelemetry::sdk::common::ExportResult Export(const MetricData &data) noexcept = 0;

/**
* Force flush the exporter.
Expand All @@ -49,9 +47,6 @@ class MetricExporter
*/
virtual bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0;

private:
AggregationTemporality aggregation_temporality_;
};
} // namespace metrics
} // namespace sdk
Expand Down
1 change: 1 addition & 0 deletions sdk/include/opentelemetry/sdk/metrics/metric_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class MetricReader

virtual void OnInitialized() noexcept {}

protected:
bool IsShutdown() const noexcept;

private:
Expand Down
1 change: 1 addition & 0 deletions sdk/src/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ add_library(
meter.cc
meter_context.cc
metric_reader.cc
export/periodic_exporting_metric_reader.cc
state/metric_collector.cc
state/sync_metric_storage.cc
aggregation/histogram_aggregation.cc
Expand Down
101 changes: 101 additions & 0 deletions sdk/src/metrics/export/periodic_exporting_metric_reader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#ifndef ENABLE_METRICS_PREVIEW
# include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h"
# include "opentelemetry/sdk/common/global_log_handler.h"
# include "opentelemetry/sdk/metrics/metric_exporter.h"

# include <chrono>
# include <future>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

PeriodicExportingMetricReader::PeriodicExportingMetricReader(
std::unique_ptr<MetricExporter> exporter,
const PeriodicExportingMetricReaderOptions &option,
AggregationTemporality aggregation_temporality)
: MetricReader(aggregation_temporality),
exporter_{std::move(exporter)},
export_interval_millis_{option.export_interval_millis},
export_timeout_millis_{option.export_timeout_millis}
{
if (export_interval_millis_ <= export_timeout_millis_)
{
OTEL_INTERNAL_LOG_WARN(
"[Periodic Exporting Metric Reader] Invalid configuration: "
"export_interval_millis_ should be less than export_timeout_millis_, using default values");
export_interval_millis_ = kExportIntervalMillis;
export_timeout_millis_ = kExportTimeOutMillis;
}
}

void PeriodicExportingMetricReader::OnInitialized() noexcept
{
worker_thread_ = std::thread(&PeriodicExportingMetricReader::DoBackgroundWork, this);
}

void PeriodicExportingMetricReader::DoBackgroundWork()
{
std::unique_lock<std::mutex> lk(cv_m_);
do
{
if (IsShutdown())
{
break;
}
std::atomic<bool> cancel_export_for_timeout{false};
auto start = std::chrono::steady_clock::now();
auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] {
Collect([this, &cancel_export_for_timeout](MetricData data) {
if (cancel_export_for_timeout)
{
OTEL_INTERNAL_LOG_ERROR(
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
<< export_timeout_millis_.count() << " ms, and timed out");
return false;
}
this->exporter_->Export(data);
return true;
});
});
std::future_status status;
do
{
status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_));
if (status == std::future_status::timeout)
{
cancel_export_for_timeout = true;
break;
}
} while (status != std::future_status::ready);
auto end = std::chrono::steady_clock::now();
auto export_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
auto remaining_wait_interval_ms = export_interval_millis_ - export_time_ms;
cv_.wait_for(lk, remaining_wait_interval_ms);
} while (true);
}

bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept
{
return exporter_->ForceFlush(timeout);
}

bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept
{
if (worker_thread_.joinable())
{
cv_.notify_one();
worker_thread_.join();
}
return exporter_->Shutdown(timeout);
}

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
#endif
10 changes: 7 additions & 3 deletions sdk/src/metrics/metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ MetricReader::MetricReader(AggregationTemporality aggregation_temporality)
void MetricReader::SetMetricProducer(MetricProducer *metric_producer)
{
metric_producer_ = metric_producer;
OnInitialized();
}

AggregationTemporality MetricReader::GetAggregationTemporality() const noexcept
Expand All @@ -46,18 +47,21 @@ bool MetricReader::Collect(nostd::function_ref<bool(MetricData)> callback) noexc
bool MetricReader::Shutdown(std::chrono::microseconds timeout) noexcept
{
bool status = true;

if (IsShutdown())
{
OTEL_INTERNAL_LOG_WARN("MetricReader::Shutdown - Cannot invoke shutdown twice!");
}

{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
shutdown_ = true;
}

if (!OnShutDown(timeout))
{
status = false;
OTEL_INTERNAL_LOG_WARN("MetricReader::OnShutDown Shutdown failed. Will not be tried again!");
}
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
shutdown_ = true;
return status;
}

Expand Down
3 changes: 2 additions & 1 deletion sdk/test/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ foreach(
observer_result_test
sync_instruments_test
async_instruments_test
metric_reader_test)
metric_reader_test
periodic_exporting_metric_reader_test)
add_executable(${testname} "${testname}.cc")
target_link_libraries(
${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}
Expand Down
3 changes: 1 addition & 2 deletions sdk/test/metrics/meter_provider_sdk_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ class MockMetricExporter : public MetricExporter

public:
MockMetricExporter() = default;
opentelemetry::sdk::common::ExportResult Export(
const opentelemetry::nostd::span<std::unique_ptr<MetricData>> &records) noexcept override
opentelemetry::sdk::common::ExportResult Export(const MetricData &records) noexcept override
{
return opentelemetry::sdk::common::ExportResult::kSuccess;
}
Expand Down
81 changes: 81 additions & 0 deletions sdk/test/metrics/periodic_exporting_metric_reader_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#ifndef ENABLE_METRICS_PREVIEW

# include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h"
# include "opentelemetry/sdk/metrics/export/metric_producer.h"
# include "opentelemetry/sdk/metrics/metric_exporter.h"

# include <gtest/gtest.h>

using namespace opentelemetry;
using namespace opentelemetry::sdk::instrumentationlibrary;
using namespace opentelemetry::sdk::metrics;

class MockPushMetricExporter : public MetricExporter
{
public:
opentelemetry::sdk::common::ExportResult Export(const MetricData &record) noexcept override
{
records_.push_back(record);
return opentelemetry::sdk::common::ExportResult::kSuccess;
}

bool ForceFlush(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override
{
return false;
}

bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override
{
return true;
}

size_t GetDataCount() { return records_.size(); }

private:
std::vector<MetricData> records_;
};

class MockMetricProducer : public MetricProducer
{
public:
MockMetricProducer(std::chrono::microseconds sleep_ms = std::chrono::microseconds::zero())
: sleep_ms_{sleep_ms}, data_sent_size_(0)
{}

bool Collect(nostd::function_ref<bool(MetricData)> callback) noexcept override
{
std::this_thread::sleep_for(sleep_ms_);
data_sent_size_++;
MetricData data;
callback(data);
return true;
}

size_t GetDataCount() { return data_sent_size_; }

private:
std::chrono::microseconds sleep_ms_;
size_t data_sent_size_;
};

TEST(PeriodicExporingMetricReader, BasicTests)
{
std::unique_ptr<MetricExporter> exporter(new MockPushMetricExporter());
PeriodicExportingMetricReaderOptions options;
options.export_timeout_millis = std::chrono::milliseconds(200);
options.export_interval_millis = std::chrono::milliseconds(500);
auto exporter_ptr = exporter.get();
PeriodicExportingMetricReader reader(std::move(exporter), options);
MockMetricProducer producer;
reader.SetMetricProducer(&producer);
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
reader.Shutdown();
EXPECT_EQ(static_cast<MockPushMetricExporter *>(exporter_ptr)->GetDataCount(),
static_cast<MockMetricProducer *>(&producer)->GetDataCount());
}

#endif