Skip to content

Commit

Permalink
[SDK] Fix crash in PeriodicExportingMetricReader. (#2983)
Browse files Browse the repository at this point in the history
  • Loading branch information
owent authored Jul 19, 2024
1 parent f195b9e commit 4520aa5
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 36 deletions.
31 changes: 31 additions & 0 deletions api/include/opentelemetry/common/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,37 @@ point.

#endif

// OPENTELEMETRY_HAVE_EXCEPTIONS
//
// Checks whether the compiler both supports and enables exceptions. Many
// compilers support a "no exceptions" mode that disables exceptions.
//
// Generally, when OPENTELEMETRY_HAVE_EXCEPTIONS is not defined:
//
// * Code using `throw` and `try` may not compile.
// * The `noexcept` specifier will still compile and behave as normal.
// * The `noexcept` operator may still return `false`.
//
// For further details, consult the compiler's documentation.
#ifndef OPENTELEMETRY_HAVE_EXCEPTIONS
# if defined(__clang__) && ((__clang_major__ * 100) + __clang_minor__) < 306
// Clang < 3.6
// http://releases.llvm.org/3.6.0/tools/clang/docs/ReleaseNotes.html#the-exceptions-macro
# if defined(__EXCEPTIONS) && OPENTELEMETRY_HAVE_FEATURE(cxx_exceptions)
# define OPENTELEMETRY_HAVE_EXCEPTIONS 1
# endif // defined(__EXCEPTIONS) && OPENTELEMETRY_HAVE_FEATURE(cxx_exceptions)
# elif OPENTELEMETRY_HAVE_FEATURE(cxx_exceptions)
# define OPENTELEMETRY_HAVE_EXCEPTIONS 1
// Handle remaining special cases and default to exceptions being supported.
# elif !(defined(__GNUC__) && !defined(__EXCEPTIONS) && !defined(__cpp_exceptions)) && \
!(defined(_MSC_VER) && !defined(_CPPUNWIND))
# define OPENTELEMETRY_HAVE_EXCEPTIONS 1
# endif
#endif
#ifndef OPENTELEMETRY_HAVE_EXCEPTIONS
# define OPENTELEMETRY_HAVE_EXCEPTIONS 0
#endif

/*
OPENTELEMETRY_ATTRIBUTE_LIFETIME_BOUND indicates that a resource owned by a function
parameter or implicit object parameter is retained by the return value of the
Expand Down
1 change: 1 addition & 0 deletions examples/plugin/plugin/tracer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "opentelemetry/common/attribute_value.h"
#include "opentelemetry/common/timestamp.h"
#include "opentelemetry/context/context_value.h"
#include "opentelemetry/nostd/utility.h"
#include "opentelemetry/trace/span_context.h"
#include "opentelemetry/trace/span_metadata.h"
#include "tracer.h"
Expand Down
22 changes: 12 additions & 10 deletions exporters/otlp/src/otlp_file_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
// clang-format on

#include "google/protobuf/message.h"
#include "google/protobuf/reflection.h"
#include "google/protobuf/stubs/common.h"
#include "nlohmann/json.hpp"

// clang-format off
Expand All @@ -28,15 +26,26 @@
#include "opentelemetry/sdk/common/global_log_handler.h"
#include "opentelemetry/version.h"

#ifdef _MSC_VER
# include <string.h>
# define strcasecmp _stricmp
#else
# include <strings.h>
#endif

#include <limits.h>
#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <fstream>
#include <functional>
#include <mutex>
#include <string>
#include <thread>
#include <utility>
#include <vector>

#if !defined(__CYGWIN__) && defined(_WIN32)
Expand Down Expand Up @@ -64,11 +73,8 @@

#else

# include <dirent.h>
# include <errno.h>
# include <fcntl.h>
# include <sys/stat.h>
# include <sys/types.h>
# include <unistd.h>

# define FS_ACCESS(x) access(x, F_OK)
Expand All @@ -89,10 +95,6 @@
# undef GetMessage
#endif

#ifdef _MSC_VER
# define strcasecmp _stricmp
#endif

#if (defined(_MSC_VER) && _MSC_VER >= 1600) || \
(defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L) || defined(__STDC_LIB_EXT1__)
# ifdef _MSC_VER
Expand Down
77 changes: 56 additions & 21 deletions sdk/src/metrics/export/periodic_exporting_metric_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <thread>
#include <utility>

#include "opentelemetry/common/macros.h"
#include "opentelemetry/common/timestamp.h"
#include "opentelemetry/sdk/common/global_log_handler.h"
#include "opentelemetry/sdk/metrics/export/metric_producer.h"
Expand All @@ -29,6 +30,10 @@
# include <future>
#endif

#if OPENTELEMETRY_HAVE_EXCEPTIONS
# include <exception>
#endif

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
Expand Down Expand Up @@ -90,31 +95,61 @@ void PeriodicExportingMetricReader::DoBackgroundWork()
bool PeriodicExportingMetricReader::CollectAndExportOnce()
{
std::atomic<bool> cancel_export_for_timeout{false};
auto future_receive = std::async(std::launch::async, [this, &cancel_export_for_timeout] {
Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) {
if (cancel_export_for_timeout)
{
OTEL_INTERNAL_LOG_ERROR(
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
<< export_timeout_millis_.count() << " ms, and timed out");
return false;
}
this->exporter_->Export(metric_data);
return true;
});
});

std::future_status status;
std::uint64_t notify_force_flush = force_flush_pending_sequence_.load(std::memory_order_acquire);
do
std::unique_ptr<std::thread> task_thread;

#if OPENTELEMETRY_HAVE_EXCEPTIONS
try
{
status = future_receive.wait_for(std::chrono::milliseconds(export_timeout_millis_));
if (status == std::future_status::timeout)
#endif
std::promise<void> sender;
auto receiver = sender.get_future();

This comment has been minimized.

Copy link
@tristan-lanfrey

tristan-lanfrey Aug 14, 2024

sender is not passed to the thread, so its value is never set.
as a result receiver will never have a value, and the wait_for below will always block all the way until timeout.

you probably want to pass the promise into the thread and change return true with sender.set_value(true); so the thread can unblock the receiver.wait_for as soon as possible.

This comment has been minimized.

Copy link
@tristan-lanfrey

tristan-lanfrey Aug 14, 2024

or maybe not change the return true since it's looks like a nested callback :) but something must set the future somewhere, this code a priori never does.


task_thread.reset(new std::thread([this, &cancel_export_for_timeout] {
this->Collect([this, &cancel_export_for_timeout](ResourceMetrics &metric_data) {
if (cancel_export_for_timeout.load(std::memory_order_acquire))
{
OTEL_INTERNAL_LOG_ERROR(
"[Periodic Exporting Metric Reader] Collect took longer configured time: "
<< this->export_timeout_millis_.count() << " ms, and timed out");
return false;
}
this->exporter_->Export(metric_data);
return true;
});
}));

std::future_status status;
do
{
cancel_export_for_timeout = true;
break;
}
} while (status != std::future_status::ready);
status = receiver.wait_for(std::chrono::milliseconds(export_timeout_millis_));
if (status == std::future_status::timeout)
{
cancel_export_for_timeout.store(true, std::memory_order_release);
break;
}
} while (status != std::future_status::ready);
#if OPENTELEMETRY_HAVE_EXCEPTIONS
}
catch (std::exception &e)
{
OTEL_INTERNAL_LOG_ERROR("[Periodic Exporting Metric Reader] Collect failed with exception "
<< e.what());
return false;
}
catch (...)
{
OTEL_INTERNAL_LOG_ERROR(
"[Periodic Exporting Metric Reader] Collect failed with unknown exception");
return false;
}
#endif

if (task_thread && task_thread->joinable())
{
task_thread->join();
}

std::uint64_t notified_sequence = force_flush_notified_sequence_.load(std::memory_order_acquire);
while (notify_force_flush > notified_sequence)
Expand Down
38 changes: 33 additions & 5 deletions sdk/test/metrics/periodic_exporting_metric_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,25 @@

#include <gtest/gtest.h>

#include <chrono>
#include <memory>
#include <thread>

using namespace opentelemetry;
using namespace opentelemetry::sdk::instrumentationscope;
using namespace opentelemetry::sdk::metrics;

class MockPushMetricExporter : public PushMetricExporter
{
public:
MockPushMetricExporter(std::chrono::milliseconds wait) : wait_(wait) {}

opentelemetry::sdk::common::ExportResult Export(const ResourceMetrics &record) noexcept override
{
if (wait_ > std::chrono::milliseconds::zero())
{
std::this_thread::sleep_for(wait_);
}
records_.push_back(record);
return opentelemetry::sdk::common::ExportResult::kSuccess;
}
Expand All @@ -34,6 +44,7 @@ class MockPushMetricExporter : public PushMetricExporter

private:
std::vector<ResourceMetrics> records_;
std::chrono::milliseconds wait_;
};

class MockMetricProducer : public MetricProducer
Expand Down Expand Up @@ -61,17 +72,34 @@ class MockMetricProducer : public MetricProducer

TEST(PeriodicExporingMetricReader, BasicTests)
{
std::unique_ptr<PushMetricExporter> exporter(new MockPushMetricExporter());
std::unique_ptr<PushMetricExporter> exporter(
new MockPushMetricExporter(std::chrono::milliseconds{0}));
PeriodicExportingMetricReaderOptions options;
options.export_timeout_millis = std::chrono::milliseconds(200);
options.export_interval_millis = std::chrono::milliseconds(500);
auto exporter_ptr = exporter.get();
PeriodicExportingMetricReader reader(std::move(exporter), options);
std::shared_ptr<PeriodicExportingMetricReader> reader =
std::make_shared<PeriodicExportingMetricReader>(std::move(exporter), options);
MockMetricProducer producer;
reader.SetMetricProducer(&producer);
reader->SetMetricProducer(&producer);
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
EXPECT_NO_THROW(reader.ForceFlush());
reader.Shutdown();
EXPECT_NO_THROW(reader->ForceFlush());
reader->Shutdown();
EXPECT_EQ(static_cast<MockPushMetricExporter *>(exporter_ptr)->GetDataCount(),
static_cast<MockMetricProducer *>(&producer)->GetDataCount());
}

TEST(PeriodicExporingMetricReader, Timeout)
{
std::unique_ptr<PushMetricExporter> exporter(
new MockPushMetricExporter(std::chrono::milliseconds{2000}));
PeriodicExportingMetricReaderOptions options;
options.export_timeout_millis = std::chrono::milliseconds(200);
options.export_interval_millis = std::chrono::milliseconds(500);
std::shared_ptr<PeriodicExportingMetricReader> reader =
std::make_shared<PeriodicExportingMetricReader>(std::move(exporter), options);
MockMetricProducer producer;
reader->SetMetricProducer(&producer);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
reader->Shutdown();
}

0 comments on commit 4520aa5

Please sign in to comment.