Skip to content

Commit

Permalink
Synchronized calls to Exporter::Export & Shutdown (#1164)
Browse files Browse the repository at this point in the history
  • Loading branch information
esigo authored Jan 5, 2022
1 parent 1f0bf83 commit 1688c7c
Show file tree
Hide file tree
Showing 19 changed files with 138 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#ifdef ENABLE_LOGS_PREVIEW

# include "nlohmann/json.hpp"
# include "opentelemetry/common/spin_lock_mutex.h"
# include "opentelemetry/ext/http/client/curl/http_client_curl.h"
# include "opentelemetry/nostd/type_traits.h"
# include "opentelemetry/sdk/logs/exporter.h"
Expand Down Expand Up @@ -104,6 +105,8 @@ class ElasticsearchLogExporter final : public opentelemetry::sdk::logs::LogExpor

// Object that stores the HTTP sessions that have been created
std::unique_ptr<ext::http::client::HttpClient> http_client_;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;
};
} // namespace logs
} // namespace exporter
Expand Down
14 changes: 11 additions & 3 deletions exporters/elasticsearch/src/es_log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

# include <sstream> // std::stringstream

# include <mutex>
# include "opentelemetry/exporters/elasticsearch/es_log_exporter.h"
# include "opentelemetry/exporters/elasticsearch/es_log_recordable.h"
# include "opentelemetry/sdk_config.h"
Expand Down Expand Up @@ -127,10 +128,10 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export(
const nostd::span<std::unique_ptr<sdklogs::Recordable>> &records) noexcept
{
// Return failure if this exporter has been shutdown
if (is_shutdown_)
if (isShutdown())
{

OTEL_INTERNAL_LOG_ERROR("[ES Trace Exporter] Export failed, exporter is shutdown");
OTEL_INTERNAL_LOG_ERROR("[ES Log Exporter] Exporting "
<< records.size() << " log(s) failed, exporter is shutdown");
return sdk::common::ExportResult::kFailure;
}

Expand Down Expand Up @@ -199,6 +200,7 @@ sdk::common::ExportResult ElasticsearchLogExporter::Export(

bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;

// Shutdown the session manager
Expand All @@ -207,6 +209,12 @@ bool ElasticsearchLogExporter::Shutdown(std::chrono::microseconds timeout) noexc

return true;
}

bool ElasticsearchLogExporter::isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}
} // namespace logs
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#pragma once

#include <opentelemetry/common/spin_lock_mutex.h>
#include <opentelemetry/ext/http/client/http_client.h>
#include <opentelemetry/sdk/trace/exporter.h>

Expand Down Expand Up @@ -75,6 +76,8 @@ class JaegerExporter final : public opentelemetry::sdk::trace::SpanExporter
bool is_shutdown_ = false;
JaegerExporterOptions options_;
std::unique_ptr<ThriftSender> sender_;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;
// For testing
friend class JaegerExporterTestPeer;
/**
Expand Down
13 changes: 12 additions & 1 deletion exporters/jaeger/src/jaeger_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
#include <agent_types.h>
#include <opentelemetry/exporters/jaeger/jaeger_exporter.h>
#include <opentelemetry/exporters/jaeger/recordable.h>
#include "opentelemetry/sdk_config.h"

#include "http_transport.h"
#include "thrift_sender.h"
#include "udp_transport.h"

#include <mutex>
#include <vector>

namespace sdk_common = opentelemetry::sdk::common;
Expand Down Expand Up @@ -39,8 +41,10 @@ std::unique_ptr<trace_sdk::Recordable> JaegerExporter::MakeRecordable() noexcept
sdk_common::ExportResult JaegerExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans) noexcept
{
if (is_shutdown_)
if (isShutdown())
{
OTEL_INTERNAL_LOG_ERROR("[Jaeger Trace Exporter] Exporting "
<< spans.size() << " span(s) failed, exporter is shutdown");
return sdk_common::ExportResult::kFailure;
}

Expand Down Expand Up @@ -91,10 +95,17 @@ void JaegerExporter::InitializeEndpoint()

bool JaegerExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
return true;
}

bool JaegerExporter::isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}

} // namespace jaeger
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

#pragma once
#include <mutex>
#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/exporters/memory/in_memory_span_data.h"
#include "opentelemetry/sdk/trace/exporter.h"
#include "opentelemetry/sdk_config.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
Expand Down Expand Up @@ -42,8 +45,10 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
sdk::common::ExportResult Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &recordables) noexcept override
{
if (is_shutdown_)
if (isShutdown())
{
OTEL_INTERNAL_LOG_ERROR("[In Memory Span Exporter] Exporting "
<< recordables.size() << " span(s) failed, exporter is shutdown");
return sdk::common::ExportResult::kFailure;
}
for (auto &recordable : recordables)
Expand All @@ -67,6 +72,7 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
return true;
};
Expand All @@ -82,6 +88,12 @@ class InMemorySpanExporter final : public opentelemetry::sdk::trace::SpanExporte
private:
std::shared_ptr<opentelemetry::exporter::memory::InMemorySpanData> data_;
bool is_shutdown_ = false;
mutable opentelemetry::common::SpinLockMutex lock_;
const bool isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}
};
} // namespace memory
} // namespace exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once
#ifdef ENABLE_LOGS_PREVIEW

# include "opentelemetry/common/spin_lock_mutex.h"
# include "opentelemetry/nostd/type_traits.h"
# include "opentelemetry/sdk/logs/exporter.h"
# include "opentelemetry/sdk/logs/log_record.h"
Expand Down Expand Up @@ -49,6 +50,8 @@ class OStreamLogExporter final : public opentelemetry::sdk::logs::LogExporter
std::ostream &sout_;
// Whether this exporter has been shut down
bool is_shutdown_ = false;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;
};
} // namespace logs
} // namespace exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#pragma once

#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/nostd/type_traits.h"
#include "opentelemetry/sdk/trace/exporter.h"
#include "opentelemetry/sdk/trace/span_data.h"
Expand Down Expand Up @@ -42,7 +43,9 @@ class OStreamSpanExporter final : public opentelemetry::sdk::trace::SpanExporter

private:
std::ostream &sout_;
bool isShutdown_ = false;
bool is_shutdown_ = false;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;

// Mapping status number to the string from api/include/opentelemetry/trace/canonical_code.h
std::map<int, std::string> statusMap{{0, "Unset"}, {1, "Ok"}, {2, "Error"}};
Expand Down
13 changes: 12 additions & 1 deletion exporters/ostream/src/log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#ifdef ENABLE_LOGS_PREVIEW
# include "opentelemetry/exporters/ostream/log_exporter.h"
# include <mutex>
# include "opentelemetry/sdk_config.h"

# include <iostream>

Expand Down Expand Up @@ -107,8 +109,10 @@ std::unique_ptr<sdklogs::Recordable> OStreamLogExporter::MakeRecordable() noexce
sdk::common::ExportResult OStreamLogExporter::Export(
const nostd::span<std::unique_ptr<sdklogs::Recordable>> &records) noexcept
{
if (is_shutdown_)
if (isShutdown())
{
OTEL_INTERNAL_LOG_ERROR("[Ostream Log Exporter] Exporting "
<< records.size() << " log(s) failed, exporter is shutdown");
return sdk::common::ExportResult::kFailure;
}

Expand Down Expand Up @@ -168,10 +172,17 @@ sdk::common::ExportResult OStreamLogExporter::Export(

bool OStreamLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
return true;
}

bool OStreamLogExporter::isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}

} // namespace logs
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
Expand Down
15 changes: 12 additions & 3 deletions exporters/ostream/src/span_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

#include "opentelemetry/exporters/ostream/span_exporter.h"

#include <iostream>
#include <mutex>
#include "opentelemetry/sdk_config.h"

namespace nostd = opentelemetry::nostd;
namespace trace_sdk = opentelemetry::sdk::trace;
Expand Down Expand Up @@ -44,8 +45,10 @@ std::unique_ptr<trace_sdk::Recordable> OStreamSpanExporter::MakeRecordable() noe
sdk::common::ExportResult OStreamSpanExporter::Export(
const nostd::span<std::unique_ptr<trace_sdk::Recordable>> &spans) noexcept
{
if (isShutdown_)
if (isShutdown())
{
OTEL_INTERNAL_LOG_ERROR("[Ostream Trace Exporter] Exporting "
<< spans.size() << " span(s) failed, exporter is shutdown");
return sdk::common::ExportResult::kFailure;
}

Expand Down Expand Up @@ -95,10 +98,16 @@ sdk::common::ExportResult OStreamSpanExporter::Export(

bool OStreamSpanExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
isShutdown_ = true;
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
return true;
}

bool OStreamSpanExporter::isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}
void OStreamSpanExporter::printAttributes(
const std::unordered_map<std::string, sdkcommon::OwnedAttributeValue> &map,
const std::string prefix)
Expand Down
5 changes: 3 additions & 2 deletions exporters/ostream/test/ostream_log_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ TEST(OStreamLogExporter, Shutdown)

// Restore original stringstream buffer
std::cout.rdbuf(original);

ASSERT_EQ(output.str(), "");
std::string err_message =
"[Ostream Log Exporter] Exporting 1 log(s) failed, exporter is shutdown";
EXPECT_TRUE(output.str().find(err_message) != std::string::npos);
}

// ---------------------------------- Print to cout -------------------------
Expand Down
5 changes: 3 additions & 2 deletions exporters/ostream/test/ostream_span_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ TEST(OStreamSpanExporter, Shutdown)
EXPECT_TRUE(processor->Shutdown());
processor->OnEnd(std::move(recordable));
});

EXPECT_EQ(captured, "");
std::string err_message =
"[Ostream Trace Exporter] Exporting 1 span(s) failed, exporter is shutdown";
EXPECT_TRUE(captured.find(err_message) != std::string::npos);
}

constexpr const char *kDefaultSpanPrinted =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "opentelemetry/exporters/otlp/protobuf_include_prefix.h"

#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/proto/collector/trace/v1/trace_service.grpc.pb.h"

#include "opentelemetry/exporters/otlp/protobuf_include_suffix.h"
Expand Down Expand Up @@ -77,6 +78,8 @@ class OtlpGrpcExporter final : public opentelemetry::sdk::trace::SpanExporter
*/
OtlpGrpcExporter(std::unique_ptr<proto::collector::trace::v1::TraceService::StubInterface> stub);
bool is_shutdown_ = false;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;
};
} // namespace otlp
} // namespace exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

# include "opentelemetry/exporters/otlp/protobuf_include_prefix.h"
# include "opentelemetry/proto/collector/logs/v1/logs_service.grpc.pb.h"
# include "opentelemetry/common/spin_lock_mutex.h"
# include "opentelemetry/exporters/otlp/protobuf_include_suffix.h"

// clang-format on
Expand Down Expand Up @@ -78,6 +79,8 @@ class OtlpGrpcLogExporter : public opentelemetry::sdk::logs::LogExporter
*/
OtlpGrpcLogExporter(std::unique_ptr<proto::collector::logs::v1::LogsService::StubInterface> stub);
bool is_shutdown_ = false;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;
};

} // namespace otlp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "opentelemetry/exporters/otlp/protobuf_include_suffix.h"

#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/ext/http/client/http_client.h"
#include "opentelemetry/sdk/common/exporter_utils.h"

Expand Down Expand Up @@ -124,6 +125,8 @@ class OtlpHttpClient
std::shared_ptr<ext::http::client::HttpClient> http_client_;
// Cached parsed URI
std::string http_uri_;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;
};
} // namespace otlp
} // namespace exporter
Expand Down
13 changes: 11 additions & 2 deletions exporters/otlp/src/otlp_grpc_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

#include "opentelemetry/exporters/otlp/otlp_grpc_exporter.h"
#include <mutex>
#include "opentelemetry/exporters/otlp/otlp_recordable.h"
#include "opentelemetry/exporters/otlp/otlp_recordable_utils.h"
#include "opentelemetry/ext/http/common/url_parser.h"
Expand Down Expand Up @@ -103,9 +104,10 @@ std::unique_ptr<sdk::trace::Recordable> OtlpGrpcExporter::MakeRecordable() noexc
sdk::common::ExportResult OtlpGrpcExporter::Export(
const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &spans) noexcept
{
if (is_shutdown_)
if (isShutdown())
{
OTEL_INTERNAL_LOG_ERROR("[OTLP gRPC] Export failed, exporter is shutdown");
OTEL_INTERNAL_LOG_ERROR("[OTLP gRPC] Exporting " << spans.size()
<< " span(s) failed, exporter is shutdown");
return sdk::common::ExportResult::kFailure;
}
proto::collector::trace::v1::ExportTraceServiceRequest request;
Expand Down Expand Up @@ -138,10 +140,17 @@ sdk::common::ExportResult OtlpGrpcExporter::Export(

bool OtlpGrpcExporter::Shutdown(std::chrono::microseconds timeout) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
return true;
}

bool OtlpGrpcExporter::isShutdown() const noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
return is_shutdown_;
}

} // namespace otlp
} // namespace exporter
OPENTELEMETRY_END_NAMESPACE
Loading

0 comments on commit 1688c7c

Please sign in to comment.