Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement periodic exporting metric reader #1286

Merged
merged 21 commits into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once
#ifndef ENABLE_METRICS_PREVIEW

# include "opentelemetry/sdk/metrics/metric_reader.h"
# include "opentelemetry/version.h"

# include <atomic>
# include <chrono>
# include <condition_variable>
# include <thread>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

class MetricExporter;
/**
* Struct to hold batch PeriodicExortingMetricReader options.
*/

struct PeriodicExportingMetricReaderOptions
{

/* The time interval between two consecutive exports. */
std::chrono::milliseconds schedule_delay_millis = std::chrono::milliseconds(60000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name schedule_delay_millis might be misleading.

The suggested name is export_interval_millis https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#periodic-exporting-metricreader

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, if export 1 runs during [T1, T2] and export 2 runs during [T3, T4], export_interval_millis = T3 - T1, while schedule_delay_millis = T3 - T2.

Copy link
Member Author

@lalitb lalitb Mar 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, changed the variable name to export_interval_millis, and also logic to ensure there is exactly export_interval_millis ms difference between two exports (by adjusting the time taken for export operation).


/* how long the export can run before it is cancelled. */
std::chrono::milliseconds export_timeout_millis = std::chrono::milliseconds(30000);
};

class PeriodicExportingMetricReader : public MetricReader
{

public:
PeriodicExportingMetricReader(
std::unique_ptr<MetricExporter> exporter,
const PeriodicExportingMetricReaderOptions &option,
AggregationTemporality aggregation_temporality = AggregationTemporality::kCummulative);

private:
bool OnForceFlush(std::chrono::microseconds timeout) noexcept override;

bool OnShutDown(std::chrono::microseconds timeout) noexcept override;

void OnInitialized() noexcept override;

std::unique_ptr<MetricExporter> exporter_;
std::chrono::milliseconds schedule_delay_millis_;
std::chrono::milliseconds export_timeout_millis_;

void DoBackgroundWork();

/* The background worker thread */
std::thread worker_thread_;

/* Synchronization primitives */
std::condition_variable cv_, force_flush_cv_;
esigo marked this conversation as resolved.
Show resolved Hide resolved
std::mutex cv_m_, force_flush_cv_m_, shutdown_m_;
};

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
#endif
7 changes: 1 addition & 6 deletions sdk/include/opentelemetry/sdk/metrics/metric_exporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ class MetricExporter
* concurrently for the same exporter instance.
* @param spans a span of unique pointers to metrics data
esigo marked this conversation as resolved.
Show resolved Hide resolved
*/
virtual opentelemetry::sdk::common::ExportResult Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::metrics::MetricData>>
&records) noexcept = 0;
virtual opentelemetry::sdk::common::ExportResult Export(const MetricData &records) noexcept = 0;

/**
* Force flush the exporter.
Expand All @@ -49,9 +47,6 @@ class MetricExporter
*/
virtual bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0;

private:
AggregationTemporality aggregation_temporality_;
};
} // namespace metrics
} // namespace sdk
Expand Down
6 changes: 3 additions & 3 deletions sdk/include/opentelemetry/sdk/metrics/metric_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ class MetricProducer;
class MetricReader
{
public:
MetricReader(
AggregationTemporality aggregation_temporality = AggregationTemporality::kCummulative);
MetricReader(AggregationTemporality aggr_temp = AggregationTemporality::kCummulative);

void SetMetricProducer(MetricProducer *metric_producer);

Expand All @@ -51,13 +50,14 @@ class MetricReader

virtual ~MetricReader() = default;

private:
// private:
virtual bool OnForceFlush(std::chrono::microseconds timeout) noexcept = 0;

virtual bool OnShutDown(std::chrono::microseconds timeout) noexcept = 0;

virtual void OnInitialized() noexcept {}

protected:
bool IsShutdown() const noexcept;

private:
Expand Down
1 change: 1 addition & 0 deletions sdk/src/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ add_library(
meter.cc
meter_context.cc
metric_reader.cc
export/periodic_exporting_metric_reader.cc
state/metric_collector.cc
aggregation/histogram_aggregation.cc
aggregation/lastvalue_aggregation.cc
Expand Down
90 changes: 90 additions & 0 deletions sdk/src/metrics/export/periodic_exporting_metric_reader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#ifndef ENABLE_METRICS_PREVIEW
# include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h"
# include "opentelemetry/sdk/common/global_log_handler.h"
# include "opentelemetry/sdk/metrics/metric_exporter.h"

# include <chrono>
# include <future>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace metrics
{

PeriodicExportingMetricReader::PeriodicExportingMetricReader(
std::unique_ptr<MetricExporter> exporter,
const PeriodicExportingMetricReaderOptions &option,
AggregationTemporality aggregation_temporality)
: MetricReader(aggregation_temporality),
exporter_{std::move(exporter)},
schedule_delay_millis_{option.schedule_delay_millis},
export_timeout_millis_{option.export_timeout_millis}
{}

void PeriodicExportingMetricReader::OnInitialized() noexcept
{
worker_thread_ = std::thread(&PeriodicExportingMetricReader::DoBackgroundWork, this);
}

void PeriodicExportingMetricReader::DoBackgroundWork()
{
auto timeout = schedule_delay_millis_;
std::unique_lock<std::mutex> lk(cv_m_);
do
{
cv_.wait_for(lk, timeout);
if (IsShutdown())
{
break;
}
std::atomic<bool> cancel_export_for_timeout{false};
auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] {
Collect([this, &cancel_export_for_timeout](MetricData data) {
if (cancel_export_for_timeout)
{
OTEL_INTERNAL_LOG_ERROR(
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
<< export_timeout_millis_.count() << " ms, and timed out");
return false;
}
this->exporter_->Export(data);
return true;
});
});
std::future_status status;
do
{
status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_));
if (status == std::future_status::timeout)
{
cancel_export_for_timeout = true;
break;
}
} while (status != std::future_status::ready);

} while (true);
}

bool PeriodicExportingMetricReader::OnForceFlush(std::chrono::microseconds timeout) noexcept
{
return exporter_->ForceFlush(timeout);
}

bool PeriodicExportingMetricReader::OnShutDown(std::chrono::microseconds timeout) noexcept
{
if (worker_thread_.joinable())
{
cv_.notify_one();
worker_thread_.join();
}
return exporter_->Shutdown(timeout);
}

} // namespace metrics
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
#endif
9 changes: 7 additions & 2 deletions sdk/src/metrics/metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ MetricReader::MetricReader(AggregationTemporality aggregation_temporality)
void MetricReader::SetMetricProducer(MetricProducer *metric_producer)
{
metric_producer_ = metric_producer;
OnInitialized();
}

AggregationTemporality MetricReader::GetAggregationTemporality() const noexcept
Expand All @@ -46,18 +47,22 @@ bool MetricReader::Collect(nostd::function_ref<bool(MetricData)> callback) noexc
bool MetricReader::Shutdown(std::chrono::microseconds timeout) noexcept
{
bool status = true;

if (IsShutdown())
{
OTEL_INTERNAL_LOG_WARN("MetricReader::Shutdown - Cannot invoke shutdown twice!");
}

{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
shutdown_ = true;
}

if (!OnShutDown(timeout))
{
status = false;
OTEL_INTERNAL_LOG_WARN("MetricReader::OnShutDown Shutdown failed. Will not be tried again!");
}
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
esigo marked this conversation as resolved.
Show resolved Hide resolved
shutdown_ = true;
return status;
}

Expand Down
3 changes: 2 additions & 1 deletion sdk/test/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ foreach(
observer_result_test
sync_instruments_test
async_instruments_test
metric_reader_test)
metric_reader_test
periodic_exporting_metric_reader_test)
add_executable(${testname} "${testname}.cc")
target_link_libraries(
${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}
Expand Down
3 changes: 1 addition & 2 deletions sdk/test/metrics/meter_provider_sdk_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ class MockMetricExporter : public MetricExporter

public:
MockMetricExporter() = default;
opentelemetry::sdk::common::ExportResult Export(
const opentelemetry::nostd::span<std::unique_ptr<MetricData>> &records) noexcept override
opentelemetry::sdk::common::ExportResult Export(const MetricData &records) noexcept override
{
return opentelemetry::sdk::common::ExportResult::kSuccess;
}
Expand Down
81 changes: 81 additions & 0 deletions sdk/test/metrics/periodic_exporting_metric_reader_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#ifndef ENABLE_METRICS_PREVIEW

# include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h"
# include "opentelemetry/sdk/metrics/export/metric_producer.h"
# include "opentelemetry/sdk/metrics/metric_exporter.h"

# include <gtest/gtest.h>

using namespace opentelemetry;
using namespace opentelemetry::sdk::instrumentationlibrary;
using namespace opentelemetry::sdk::metrics;

class MockPushMetricExporter : public MetricExporter
{
public:
opentelemetry::sdk::common::ExportResult Export(const MetricData &record) noexcept override
{
records.push_back(record);
return opentelemetry::sdk::common::ExportResult::kSuccess;
}

bool ForceFlush(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override
{
return false;
}

bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override
{
return true;
}

size_t GetDataCount() { return records.size(); }

private:
std::vector<MetricData> records;
esigo marked this conversation as resolved.
Show resolved Hide resolved
};

class MockMetricProducer : public MetricProducer
{
public:
MockMetricProducer(std::chrono::microseconds sleep_ms = std::chrono::microseconds::zero())
: sleep_ms_{sleep_ms}, data_sent_size_(0)
{}

bool Collect(nostd::function_ref<bool(MetricData)> callback) noexcept override
{
std::this_thread::sleep_for(sleep_ms_);
data_sent_size_++;
MetricData data;
callback(data);
return true;
}

size_t GetDataCount() { return data_sent_size_; }

private:
std::chrono::microseconds sleep_ms_;
size_t data_sent_size_;
};

TEST(PeriodicExporingMetricReader, BasicTests)
{
std::unique_ptr<MetricExporter> exporter(new MockPushMetricExporter());
PeriodicExportingMetricReaderOptions options;
options.export_timeout_millis = std::chrono::milliseconds(2000);
options.schedule_delay_millis = std::chrono::milliseconds(500);
auto exporter_ptr = exporter.get();
PeriodicExportingMetricReader reader(std::move(exporter), options);
MockMetricProducer producer;
reader.SetMetricProducer(&producer);
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
reader.Shutdown();
EXPECT_EQ(static_cast<MockPushMetricExporter *>(exporter_ptr)->GetDataCount(),
static_cast<MockMetricProducer *>(&producer)->GetDataCount());
}

#endif