Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorder the destructor of members in LoggerProvider and TracerProvider #1245

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,8 @@ class ElasticSearchRecordable final : public sdk::logs::Recordable
void SetSeverity(opentelemetry::logs::Severity severity) noexcept override
{
// Convert the severity enum to a string
int severity_index = static_cast<int>(severity);
if (severity_index < 0 ||
severity_index >= std::extent<decltype(opentelemetry::logs::SeverityNumToText)>::value)
std::uint32_t severity_index = static_cast<std::uint32_t>(severity);
if (severity_index >= std::extent<decltype(opentelemetry::logs::SeverityNumToText)>::value)
{
std::stringstream sout;
sout << "Invalid severity(" << severity_index << ")";
Expand Down
7 changes: 3 additions & 4 deletions exporters/ostream/src/log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,11 @@ sdk::common::ExportResult OStreamLogExporter::Export(
// into severity_num and severity_text
sout_ << "{\n"
<< " timestamp : " << log_record->GetTimestamp().time_since_epoch().count() << "\n"
<< " severity_num : " << static_cast<int>(log_record->GetSeverity()) << "\n"
<< " severity_num : " << static_cast<std::uint32_t>(log_record->GetSeverity()) << "\n"
<< " severity_text : ";

int severity_index = static_cast<int>(log_record->GetSeverity());
if (severity_index < 0 ||
severity_index >= std::extent<decltype(opentelemetry::logs::SeverityNumToText)>::value)
std::uint32_t severity_index = static_cast<std::uint32_t>(log_record->GetSeverity());
if (severity_index >= std::extent<decltype(opentelemetry::logs::SeverityNumToText)>::value)
{
sout_ << "Invalid severity(" << severity_index << ")\n";
}
Expand Down
5 changes: 5 additions & 0 deletions exporters/otlp/src/otlp_grpc_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ sdk::common::ExportResult OtlpGrpcExporter::Export(
<< " span(s) failed, exporter is shutdown");
return sdk::common::ExportResult::kFailure;
}
if (spans.empty())
{
return sdk::common::ExportResult::kSuccess;
}

proto::collector::trace::v1::ExportTraceServiceRequest request;
OtlpRecordableUtils::PopulateRequest(spans, &request);

Expand Down
5 changes: 5 additions & 0 deletions exporters/otlp/src/otlp_grpc_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogExporter::Export(
<< " log(s) failed, exporter is shutdown");
return sdk::common::ExportResult::kFailure;
}
if (logs.empty())
{
return sdk::common::ExportResult::kSuccess;
}

proto::collector::logs::v1::ExportLogsServiceRequest request;
OtlpRecordableUtils::PopulateRequest(logs, &request);

Expand Down
5 changes: 5 additions & 0 deletions exporters/otlp/src/otlp_http_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ std::unique_ptr<opentelemetry::sdk::trace::Recordable> OtlpHttpExporter::MakeRec
opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::trace::Recordable>> &spans) noexcept
{
if (spans.empty())
{
return opentelemetry::sdk::common::ExportResult::kSuccess;
}

proto::collector::trace::v1::ExportTraceServiceRequest service_request;
OtlpRecordableUtils::PopulateRequest(spans, &service_request);
return http_client_->Export(service_request);
Expand Down
4 changes: 4 additions & 0 deletions exporters/otlp/src/otlp_http_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ std::unique_ptr<opentelemetry::sdk::logs::Recordable> OtlpHttpLogExporter::MakeR
opentelemetry::sdk::common::ExportResult OtlpHttpLogExporter::Export(
const nostd::span<std::unique_ptr<opentelemetry::sdk::logs::Recordable>> &logs) noexcept
{
if (logs.empty())
{
return opentelemetry::sdk::common::ExportResult::kSuccess;
}
proto::collector::logs::v1::ExportLogsServiceRequest service_request;
OtlpRecordableUtils::PopulateRequest(logs, &service_request);
return http_client_->Export(service_request);
Expand Down
4 changes: 2 additions & 2 deletions exporters/otlp/test/otlp_http_log_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTest)

auto provider = nostd::shared_ptr<sdk::logs::LoggerProvider>(new sdk::logs::LoggerProvider());
provider->AddProcessor(std::unique_ptr<sdk::logs::LogProcessor>(
new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 1)));
new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 5)));

std::string report_trace_id;
std::string report_span_id;
Expand Down Expand Up @@ -192,7 +192,7 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTest)

auto provider = nostd::shared_ptr<sdk::logs::LoggerProvider>(new sdk::logs::LoggerProvider());
provider->AddProcessor(std::unique_ptr<sdk::logs::LogProcessor>(
new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 1)));
new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 5)));

std::string report_trace_id;
std::string report_span_id;
Expand Down
4 changes: 3 additions & 1 deletion ext/test/http/curl_http_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,9 @@ TEST_F(BasicCurlHttpTests, SendGetRequestSyncTimeout)
auto result = http_client.Get("http://222.222.222.200:19000/get/", m1);
EXPECT_EQ(result, false);

EXPECT_EQ(result.GetSessionState(), http_client::SessionState::ConnectFailed);
// When network is under proxy, it may connect success but closed by peer when send data
EXPECT_TRUE(result.GetSessionState() == http_client::SessionState::ConnectFailed ||
result.GetSessionState() == http_client::SessionState::SendFailed);
}

TEST_F(BasicCurlHttpTests, SendPostRequestSync)
Expand Down
5 changes: 4 additions & 1 deletion sdk/include/opentelemetry/sdk/_metrics/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ class PushController
{
if (active_.exchange(false))
{
runner_.join();
if (runner_.joinable())
{
runner_.join();
}
tick(); // flush metrics sitting in the processor
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/include/opentelemetry/sdk/logs/batch_log_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class BatchLogProcessor : public LogProcessor

/* Synchronization primitives */
std::condition_variable cv_, force_flush_cv_;
std::mutex cv_m_, force_flush_cv_m_;
std::mutex cv_m_, force_flush_cv_m_, shutdown_m_;

/* The buffer/queue to which the ended logs are added */
common::CircularBuffer<Recordable> buffer_;
Expand Down
5 changes: 3 additions & 2 deletions sdk/include/opentelemetry/sdk/logs/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ class Logger final : public opentelemetry::logs::Logger
// The name of this logger
std::string logger_name_;

// The logger context of this Logger. Uses a weak_ptr to avoid cyclic dependency issues the with
std::weak_ptr<LoggerContext> context_;
// order of declaration is important here - instrumentation library should destroy after
// logger-context.
std::unique_ptr<instrumentationlibrary::InstrumentationLibrary> instrumentation_library_;
esigo marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<LoggerContext> context_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is cyclic dependency still an issue here as it changes weak_ptr to shared_ptr?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no cyclic dependency right now.It's only be hold in Logger and LoggerProvider and Logger is only hold by LoggerProvider.

};

} // namespace logs
Expand Down
20 changes: 14 additions & 6 deletions sdk/include/opentelemetry/sdk/logs/logger_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class LoggerProvider final : public opentelemetry::logs::LoggerProvider
*/
explicit LoggerProvider(std::shared_ptr<sdk::logs::LoggerContext> context) noexcept;

~LoggerProvider();

/**
* Creates a logger with the given name, and returns a shared pointer to it.
* If a logger with that name already exists, return a shared pointer to it
Expand Down Expand Up @@ -107,14 +109,20 @@ class LoggerProvider final : public opentelemetry::logs::LoggerProvider
*/
const opentelemetry::sdk::resource::Resource &GetResource() const noexcept;

private:
// A pointer to the processor stored by this logger provider
std::shared_ptr<sdk::logs::LoggerContext> context_;
/**
* Shutdown the log processor associated with this log provider.
*/
bool Shutdown() noexcept;
esigo marked this conversation as resolved.
Show resolved Hide resolved

// A vector of pointers to all the loggers that have been created
std::vector<std::shared_ptr<opentelemetry::sdk::logs::Logger>> loggers_;
/**
* Force flush the log processor associated with this log provider.
*/
bool ForceFlush(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;

// A mutex that ensures only one thread is using the map of loggers
private:
// order of declaration is important here - loggers should destroy only after context.
std::vector<std::shared_ptr<opentelemetry::sdk::logs::Logger>> loggers_;
std::shared_ptr<sdk::logs::LoggerContext> context_;
std::mutex lock_;
};
} // namespace logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class BatchSpanProcessor : public SpanProcessor

/* Synchronization primitives */
std::condition_variable cv_, force_flush_cv_;
std::mutex cv_m_, force_flush_cv_m_;
std::mutex cv_m_, force_flush_cv_m_, shutdown_m_;

/* The buffer/queue to which the ended spans are added */
common::CircularBuffer<Recordable> buffer_;
Expand Down
2 changes: 2 additions & 0 deletions sdk/include/opentelemetry/sdk/trace/tracer_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class TracerProvider final : public opentelemetry::trace::TracerProvider
*/
explicit TracerProvider(std::shared_ptr<sdk::trace::TracerContext> context) noexcept;

~TracerProvider();

opentelemetry::nostd::shared_ptr<opentelemetry::trace::Tracer> GetTracer(
nostd::string_view library_name,
nostd::string_view library_version = "",
Expand Down
14 changes: 10 additions & 4 deletions sdk/src/logs/batch_log_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,17 @@ void BatchLogProcessor::DrainQueue()

bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
is_shutdown_.store(true);
std::lock_guard<std::mutex> shutdown_guard{shutdown_m_};
bool already_shutdown = is_shutdown_.exchange(true);

cv_.notify_one();
worker_thread_.join();
if (exporter_ != nullptr)
if (worker_thread_.joinable())
{
cv_.notify_one();
worker_thread_.join();
}

// Should only shutdown exporter ONCE.
if (!already_shutdown && exporter_ != nullptr)
{
return exporter_->Shutdown();
}
Expand Down
11 changes: 5 additions & 6 deletions sdk/src/logs/logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ Logger::Logger(nostd::string_view name,
std::unique_ptr<instrumentationlibrary::InstrumentationLibrary>
instrumentation_library) noexcept
: logger_name_(std::string(name)),
context_(context),
instrumentation_library_{std::move(instrumentation_library)}
instrumentation_library_(std::move(instrumentation_library)),
context_(context)
{}

const nostd::string_view Logger::GetName() noexcept
Expand All @@ -45,12 +45,11 @@ void Logger::Log(opentelemetry::logs::Severity severity,
common::SystemTimestamp timestamp) noexcept
{
// If this logger does not have a processor, no need to create a log record
auto context = context_.lock();
if (!context)
if (!context_)
{
return;
}
auto &processor = context->GetProcessor();
auto &processor = context_->GetProcessor();

// TODO: Sampler (should include check for minSeverity)

Expand All @@ -68,7 +67,7 @@ void Logger::Log(opentelemetry::logs::Severity severity,
recordable->SetBody(body);
recordable->SetInstrumentationLibrary(GetInstrumentationLibrary());

recordable->SetResource(context->GetResource());
recordable->SetResource(context_->GetResource());

attributes.ForEachKeyValue([&](nostd::string_view key, common::AttributeValue value) noexcept {
recordable->SetAttribute(key, value);
Expand Down
21 changes: 21 additions & 0 deletions sdk/src/logs/logger_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ LoggerProvider::LoggerProvider(std::shared_ptr<sdk::logs::LoggerContext> context
: context_{context}
{}

LoggerProvider::~LoggerProvider()
{
// Logger hold the shared pointer to the context. So we can not use destructor of LoggerContext to
// Shutdown and flush all pending recordables when we hasve more than one loggers.These
// recordables may use the raw pointer of instrumentation_library_ in Logger
if (context_)
{
context_->Shutdown();
}
}

nostd::shared_ptr<opentelemetry::logs::Logger> LoggerProvider::GetLogger(
nostd::string_view logger_name,
nostd::string_view options,
Expand Down Expand Up @@ -105,6 +116,16 @@ const opentelemetry::sdk::resource::Resource &LoggerProvider::GetResource() cons
return context_->GetResource();
}

bool LoggerProvider::Shutdown() noexcept
{
return context_->Shutdown();
}

bool LoggerProvider::ForceFlush(std::chrono::microseconds timeout) noexcept
{
return context_->ForceFlush(timeout);
}

} // namespace logs
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
Expand Down
6 changes: 2 additions & 4 deletions sdk/src/logs/multi_log_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,8 @@ bool MultiLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
}
for (auto &processor : processors_)
{
if (!processor->Shutdown(std::chrono::duration_cast<std::chrono::microseconds>(timeout_ns)))
{
result = false;
}
result |=
ThomsonTan marked this conversation as resolved.
Show resolved Hide resolved
processor->Shutdown(std::chrono::duration_cast<std::chrono::microseconds>(timeout_ns));
start_time = std::chrono::system_clock::now();
if (expire_time > start_time)
{
Expand Down
4 changes: 2 additions & 2 deletions sdk/src/logs/simple_log_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ bool SimpleLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
bool SimpleLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
// Should only shutdown exporter ONCE.
if (!shutdown_latch_.test_and_set(std::memory_order_acquire))
if (!shutdown_latch_.test_and_set(std::memory_order_acquire) && exporter_ != nullptr)
{
return exporter_->Shutdown(timeout);
}

return false;
return true;
}
} // namespace logs
} // namespace sdk
Expand Down
14 changes: 10 additions & 4 deletions sdk/src/trace/batch_span_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,17 @@ void BatchSpanProcessor::DrainQueue()

bool BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
is_shutdown_.store(true);
std::lock_guard<std::mutex> shutdown_guard{shutdown_m_};
bool already_shutdown = is_shutdown_.exchange(true);

cv_.notify_one();
worker_thread_.join();
if (exporter_ != nullptr)
if (worker_thread_.joinable())
{
cv_.notify_one();
worker_thread_.join();
}

// Should only shutdown exporter ONCE.
if (!already_shutdown && exporter_ != nullptr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is showing down is going, later call to Shutdown will just return true even if the previous Shutdown has not been completed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and thanks, it's definitely a problem.I add a mutex lock to prevent this.

{
return exporter_->Shutdown();
}
Expand Down
11 changes: 11 additions & 0 deletions sdk/src/trace/tracer_provider.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ TracerProvider::TracerProvider(std::vector<std::unique_ptr<SpanProcessor>> &&pro
std::move(id_generator));
}

TracerProvider::~TracerProvider()
{
// Tracer hold the shared pointer to the context. So we can not use destructor of TracerContext to
// Shutdown and flush all pending recordables when we have more than one tracers.These recordables
// may use the raw pointer of instrumentation_library_ in Tracer
if (context_)
{
context_->Shutdown();
}
}

nostd::shared_ptr<trace_api::Tracer> TracerProvider::GetTracer(
nostd::string_view library_name,
nostd::string_view library_version,
Expand Down
2 changes: 2 additions & 0 deletions sdk/test/logs/batch_log_processor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ TEST_F(BatchLogProcessorTest, TestShutdown)
// current batch of logs to be sent to the log exporter
// by checking the number of logs sent and the names of the logs sent
EXPECT_EQ(true, batch_processor->Shutdown());
// It's safe to shutdown again
EXPECT_TRUE(batch_processor->Shutdown());

EXPECT_EQ(num_logs, logs_received->size());

Expand Down
27 changes: 27 additions & 0 deletions sdk/test/logs/logger_provider_sdk_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# include "opentelemetry/sdk/logs/log_record.h"
# include "opentelemetry/sdk/logs/logger.h"
# include "opentelemetry/sdk/logs/logger_provider.h"
# include "opentelemetry/sdk/logs/simple_log_processor.h"

# include <gtest/gtest.h>

Expand Down Expand Up @@ -103,4 +104,30 @@ TEST(LoggerProviderSDK, GetResource)
LoggerProvider lp{nullptr, resource};
ASSERT_EQ(nostd::get<std::string>(lp.GetResource().GetAttributes().at("key")), "value");
}

TEST(LoggerProviderSDK, Shutdown)
{
std::unique_ptr<SimpleLogProcessor> processor(new SimpleLogProcessor(nullptr));
esigo marked this conversation as resolved.
Show resolved Hide resolved
std::vector<std::unique_ptr<LogProcessor>> processors;
processors.push_back(std::move(processor));

LoggerProvider lp(std::make_shared<LoggerContext>(std::move(processors)));

EXPECT_TRUE(lp.Shutdown());

// It's safe to shutdown again
EXPECT_TRUE(lp.Shutdown());
}

TEST(LoggerProviderSDK, ForceFlush)
{
std::unique_ptr<SimpleLogProcessor> processor(new SimpleLogProcessor(nullptr));
std::vector<std::unique_ptr<LogProcessor>> processors;
processors.push_back(std::move(processor));

LoggerProvider lp(std::make_shared<LoggerContext>(std::move(processors)));

EXPECT_TRUE(lp.ForceFlush());
}

#endif
Loading