Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added max async export support using separate AsyncBatchSpan/LogProcessor #1306

Merged
merged 11 commits into from
May 4, 2022
219 changes: 184 additions & 35 deletions exporters/otlp/test/otlp_http_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# include "opentelemetry/ext/http/client/http_client_factory.h"
# include "opentelemetry/ext/http/client/nosend/http_client_nosend.h"
# include "opentelemetry/ext/http/server/http_server.h"
# include "opentelemetry/sdk/trace/async_batch_span_processor.h"
# include "opentelemetry/sdk/trace/batch_span_processor.h"
# include "opentelemetry/sdk/trace/tracer_provider.h"
# include "opentelemetry/trace/provider.h"
Expand Down Expand Up @@ -85,7 +86,7 @@ class OtlpHttpExporterTestPeer : public ::testing::Test
return {new OtlpHttpClient(MakeOtlpHttpClientOptions(content_type), http_client), http_client};
}

void ExportJsonIntegrationTest(bool is_async)
void ExportJsonIntegrationTest()
{
auto mock_otlp_client =
OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson);
Expand Down Expand Up @@ -114,9 +115,7 @@ class OtlpHttpExporterTestPeer : public ::testing::Test
processor_opts.max_export_batch_size = 5;
processor_opts.max_queue_size = 5;
processor_opts.schedule_delay_millis = std::chrono::milliseconds(256);
# ifdef ENABLE_ASYNC_EXPORT
processor_opts.is_export_async = is_async;
# endif

auto processor = std::unique_ptr<sdk::trace::SpanProcessor>(
new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts));
auto provider = nostd::shared_ptr<trace::TracerProvider>(
Expand All @@ -142,7 +141,7 @@ class OtlpHttpExporterTestPeer : public ::testing::Test
auto mock_session =
std::static_pointer_cast<http_client::nosend::Session>(no_send_client->session_);
EXPECT_CALL(*mock_session, SendRequest)
.WillOnce([&mock_session, report_trace_id, is_async](
.WillOnce([&mock_session, report_trace_id](
std::shared_ptr<opentelemetry::ext::http::client::EventHandler> callback) {
auto check_json =
nlohmann::json::parse(mock_session->GetRequest()->body_, nullptr, false);
Expand All @@ -161,29 +160,107 @@ class OtlpHttpExporterTestPeer : public ::testing::Test
}

// let the otlp_http_client to continue
if (is_async)
http_client::nosend::Response response;
response.Finish(*callback.get());
});

child_span->End();
parent_span->End();

static_cast<sdk::trace::TracerProvider *>(provider.get())->ForceFlush();
}

# ifdef ENABLE_ASYNC_EXPORT
void ExportJsonIntegrationTestAsync()
{
auto mock_otlp_client =
OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kJson);
auto mock_otlp_http_client = mock_otlp_client.first;
auto client = mock_otlp_client.second;
auto exporter = GetExporter(std::unique_ptr<OtlpHttpClient>{mock_otlp_http_client});

resource::ResourceAttributes resource_attributes = {{"service.name", "unit_test_service"},
{"tenant.id", "test_user"}};
resource_attributes["bool_value"] = true;
resource_attributes["int32_value"] = static_cast<int32_t>(1);
resource_attributes["uint32_value"] = static_cast<uint32_t>(2);
resource_attributes["int64_value"] = static_cast<int64_t>(0x1100000000LL);
resource_attributes["uint64_value"] = static_cast<uint64_t>(0x1200000000ULL);
resource_attributes["double_value"] = static_cast<double>(3.1);
resource_attributes["vec_bool_value"] = std::vector<bool>{true, false, true};
resource_attributes["vec_int32_value"] = std::vector<int32_t>{1, 2};
resource_attributes["vec_uint32_value"] = std::vector<uint32_t>{3, 4};
resource_attributes["vec_int64_value"] = std::vector<int64_t>{5, 6};
resource_attributes["vec_uint64_value"] = std::vector<uint64_t>{7, 8};
resource_attributes["vec_double_value"] = std::vector<double>{3.2, 3.3};
resource_attributes["vec_string_value"] = std::vector<std::string>{"vector", "string"};
auto resource = resource::Resource::Create(resource_attributes);

auto processor_opts = sdk::trace::AsyncBatchSpanProcessorOptions();
processor_opts.max_export_batch_size = 5;
processor_opts.max_queue_size = 5;
processor_opts.schedule_delay_millis = std::chrono::milliseconds(256);

auto processor = std::unique_ptr<sdk::trace::SpanProcessor>(
new sdk::trace::AsyncBatchSpanProcessor(std::move(exporter), processor_opts));
auto provider = nostd::shared_ptr<trace::TracerProvider>(
new sdk::trace::TracerProvider(std::move(processor), resource));

std::string report_trace_id;

char trace_id_hex[2 * trace_api::TraceId::kSize] = {0};
auto tracer = provider->GetTracer("test");
auto parent_span = tracer->StartSpan("Test parent span");

trace_api::StartSpanOptions child_span_opts = {};
child_span_opts.parent = parent_span->GetContext();

auto child_span = tracer->StartSpan("Test child span", child_span_opts);

nostd::get<trace_api::SpanContext>(child_span_opts.parent)
.trace_id()
.ToLowerBase16(MakeSpan(trace_id_hex));
report_trace_id.assign(trace_id_hex, sizeof(trace_id_hex));

auto no_send_client = std::static_pointer_cast<http_client::nosend::HttpClient>(client);
auto mock_session =
std::static_pointer_cast<http_client::nosend::Session>(no_send_client->session_);
EXPECT_CALL(*mock_session, SendRequest)
.WillOnce([&mock_session, report_trace_id](
std::shared_ptr<opentelemetry::ext::http::client::EventHandler> callback) {
auto check_json =
nlohmann::json::parse(mock_session->GetRequest()->body_, nullptr, false);
auto resource_span = *check_json["resource_spans"].begin();
auto instrumentation_library_span =
*resource_span["instrumentation_library_spans"].begin();
auto span = *instrumentation_library_span["spans"].begin();
auto received_trace_id = span["trace_id"].get<std::string>();
EXPECT_EQ(received_trace_id, report_trace_id);

auto custom_header = mock_session->GetRequest()->headers_.find("Custom-Header-Key");
ASSERT_TRUE(custom_header != mock_session->GetRequest()->headers_.end());
if (custom_header != mock_session->GetRequest()->headers_.end())
{
std::thread async_finish{[callback]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
http_client::nosend::Response response;
response.Finish(*callback.get());
}};
async_finish.detach();
EXPECT_EQ("Custom-Header-Value", custom_header->second);
}
else
{

// let the otlp_http_client to continue
std::thread async_finish{[callback]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
http_client::nosend::Response response;
response.Finish(*callback.get());
}
}};
async_finish.detach();
});

child_span->End();
parent_span->End();

static_cast<sdk::trace::TracerProvider *>(provider.get())->ForceFlush();
}
# endif

void ExportBinaryIntegrationTest(bool is_async)
void ExportBinaryIntegrationTest()
{
auto mock_otlp_client =
OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary);
Expand Down Expand Up @@ -212,9 +289,7 @@ class OtlpHttpExporterTestPeer : public ::testing::Test
processor_opts.max_export_batch_size = 5;
processor_opts.max_queue_size = 5;
processor_opts.schedule_delay_millis = std::chrono::milliseconds(256);
# ifdef ENABLE_ASYNC_EXPORT
processor_opts.is_export_async = is_async;
# endif

auto processor = std::unique_ptr<sdk::trace::SpanProcessor>(
new sdk::trace::BatchSpanProcessor(std::move(exporter), processor_opts));
auto provider = nostd::shared_ptr<trace::TracerProvider>(
Expand All @@ -239,7 +314,7 @@ class OtlpHttpExporterTestPeer : public ::testing::Test
auto mock_session =
std::static_pointer_cast<http_client::nosend::Session>(no_send_client->session_);
EXPECT_CALL(*mock_session, SendRequest)
.WillOnce([&mock_session, report_trace_id, is_async](
.WillOnce([&mock_session, report_trace_id](
std::shared_ptr<opentelemetry::ext::http::client::EventHandler> callback) {
opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest request_body;
request_body.ParseFromArray(&mock_session->GetRequest()->body_[0],
Expand All @@ -255,53 +330,127 @@ class OtlpHttpExporterTestPeer : public ::testing::Test
EXPECT_EQ("Custom-Header-Value", custom_header->second);
}

// let the otlp_http_client to continue
if (is_async)
http_client::nosend::Response response;
response.Finish(*callback.get());
});

child_span->End();
parent_span->End();

static_cast<sdk::trace::TracerProvider *>(provider.get())->ForceFlush();
}

# ifdef ENABLE_ASYNC_EXPORT
void ExportBinaryIntegrationTestAsync()
{
auto mock_otlp_client =
OtlpHttpExporterTestPeer::GetMockOtlpHttpClient(HttpRequestContentType::kBinary);
auto mock_otlp_http_client = mock_otlp_client.first;
auto client = mock_otlp_client.second;
auto exporter = GetExporter(std::unique_ptr<OtlpHttpClient>{mock_otlp_http_client});

resource::ResourceAttributes resource_attributes = {{"service.name", "unit_test_service"},
{"tenant.id", "test_user"}};
resource_attributes["bool_value"] = true;
resource_attributes["int32_value"] = static_cast<int32_t>(1);
resource_attributes["uint32_value"] = static_cast<uint32_t>(2);
resource_attributes["int64_value"] = static_cast<int64_t>(0x1100000000LL);
resource_attributes["uint64_value"] = static_cast<uint64_t>(0x1200000000ULL);
resource_attributes["double_value"] = static_cast<double>(3.1);
resource_attributes["vec_bool_value"] = std::vector<bool>{true, false, true};
resource_attributes["vec_int32_value"] = std::vector<int32_t>{1, 2};
resource_attributes["vec_uint32_value"] = std::vector<uint32_t>{3, 4};
resource_attributes["vec_int64_value"] = std::vector<int64_t>{5, 6};
resource_attributes["vec_uint64_value"] = std::vector<uint64_t>{7, 8};
resource_attributes["vec_double_value"] = std::vector<double>{3.2, 3.3};
resource_attributes["vec_string_value"] = std::vector<std::string>{"vector", "string"};
auto resource = resource::Resource::Create(resource_attributes);

auto processor_opts = sdk::trace::AsyncBatchSpanProcessorOptions();
processor_opts.max_export_batch_size = 5;
processor_opts.max_queue_size = 5;
processor_opts.schedule_delay_millis = std::chrono::milliseconds(256);

auto processor = std::unique_ptr<sdk::trace::SpanProcessor>(
new sdk::trace::AsyncBatchSpanProcessor(std::move(exporter), processor_opts));
auto provider = nostd::shared_ptr<trace::TracerProvider>(
new sdk::trace::TracerProvider(std::move(processor), resource));

std::string report_trace_id;

uint8_t trace_id_binary[trace_api::TraceId::kSize] = {0};
auto tracer = provider->GetTracer("test");
auto parent_span = tracer->StartSpan("Test parent span");

trace_api::StartSpanOptions child_span_opts = {};
child_span_opts.parent = parent_span->GetContext();

auto child_span = tracer->StartSpan("Test child span", child_span_opts);
nostd::get<trace_api::SpanContext>(child_span_opts.parent)
.trace_id()
.CopyBytesTo(MakeSpan(trace_id_binary));
report_trace_id.assign(reinterpret_cast<char *>(trace_id_binary), sizeof(trace_id_binary));

auto no_send_client = std::static_pointer_cast<http_client::nosend::HttpClient>(client);
auto mock_session =
std::static_pointer_cast<http_client::nosend::Session>(no_send_client->session_);
EXPECT_CALL(*mock_session, SendRequest)
.WillOnce([&mock_session, report_trace_id](
std::shared_ptr<opentelemetry::ext::http::client::EventHandler> callback) {
opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest request_body;
request_body.ParseFromArray(&mock_session->GetRequest()->body_[0],
static_cast<int>(mock_session->GetRequest()->body_.size()));
auto received_trace_id =
request_body.resource_spans(0).instrumentation_library_spans(0).spans(0).trace_id();
EXPECT_EQ(received_trace_id, report_trace_id);

auto custom_header = mock_session->GetRequest()->headers_.find("Custom-Header-Key");
ASSERT_TRUE(custom_header != mock_session->GetRequest()->headers_.end());
if (custom_header != mock_session->GetRequest()->headers_.end())
{
std::thread async_finish{[callback]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
http_client::nosend::Response response;
response.Finish(*callback.get());
}};
async_finish.detach();
EXPECT_EQ("Custom-Header-Value", custom_header->second);
}
else
{

// let the otlp_http_client to continue
std::thread async_finish{[callback]() {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
http_client::nosend::Response response;
response.Finish(*callback.get());
}
}};
async_finish.detach();
});

child_span->End();
parent_span->End();

static_cast<sdk::trace::TracerProvider *>(provider.get())->ForceFlush();
}
# endif
};

// Create spans, let processor call Export()
TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTestSync)
{
ExportJsonIntegrationTest(false);
ExportJsonIntegrationTest();
}

# ifdef ENABLE_ASYNC_EXPORT
TEST_F(OtlpHttpExporterTestPeer, ExportJsonIntegrationTestAsync)
{
ExportJsonIntegrationTest(true);
ExportJsonIntegrationTestAsync();
}
# endif

// Create spans, let processor call Export()
TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTestSync)
{
ExportBinaryIntegrationTest(false);
ExportBinaryIntegrationTest();
}

# ifdef ENABLE_ASYNC_EXPORT
TEST_F(OtlpHttpExporterTestPeer, ExportBinaryIntegrationTestAsync)
{
ExportBinaryIntegrationTest(true);
ExportBinaryIntegrationTestAsync();
}
# endif

Expand Down
27 changes: 18 additions & 9 deletions exporters/otlp/test/otlp_http_log_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# include "opentelemetry/ext/http/client/nosend/http_client_nosend.h"
# include "opentelemetry/ext/http/server/http_server.h"
# include "opentelemetry/logs/provider.h"
# include "opentelemetry/sdk/logs/async_batch_log_processor.h"
# include "opentelemetry/sdk/logs/batch_log_processor.h"
# include "opentelemetry/sdk/logs/exporter.h"
# include "opentelemetry/sdk/logs/log_record.h"
Expand Down Expand Up @@ -103,13 +104,15 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test
std::string attribute_storage_string_value[] = {"vector", "string"};

auto provider = nostd::shared_ptr<sdk::logs::LoggerProvider>(new sdk::logs::LoggerProvider());
provider->AddProcessor(std::unique_ptr<sdk::logs::LogProcessor>(
new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 5
# ifdef ENABLE_ASYNC_EXPORT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a CI build for ENABLE_ASYNC_EXPORT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will add on this PR

,
is_async
provider->AddProcessor(
std::unique_ptr<sdk::logs::LogProcessor>(new sdk::logs::AsyncBatchLogProcessor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we create one test for AddProcessor and another for AsyncBatchLogProcessor, just like exporters/otlp/test/otlp_http_exporter_test.cc ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done in the next PR

std::move(exporter), 5, std::chrono::milliseconds(256), 5)));
# else
provider->AddProcessor(
std::unique_ptr<sdk::logs::LogProcessor>(new sdk::logs::BatchLogProcessor(
std::move(exporter), 5, std::chrono::milliseconds(256), 5)));
# endif
)));

std::string report_trace_id;
std::string report_span_id;
Expand Down Expand Up @@ -213,16 +216,22 @@ class OtlpHttpLogExporterTestPeer : public ::testing::Test
double attribute_storage_double_value[] = {3.2, 3.3};
std::string attribute_storage_string_value[] = {"vector", "string"};

auto provider = nostd::shared_ptr<sdk::logs::LoggerProvider>(new sdk::logs::LoggerProvider());
# ifdef ENABLE_ASYNC_EXPORT
sdk::logs::AsyncBatchLogProcessorOptions processor_options;
processor_options.max_export_batch_size = 5;
processor_options.max_queue_size = 5;
processor_options.schedule_delay_millis = std::chrono::milliseconds(256);
provider->AddProcessor(std::unique_ptr<sdk::logs::LogProcessor>(
new sdk::logs::AsyncBatchLogProcessor(std::move(exporter), processor_options)));
# else
sdk::logs::BatchLogProcessorOptions processor_options;
processor_options.max_export_batch_size = 5;
processor_options.max_queue_size = 5;
processor_options.schedule_delay_millis = std::chrono::milliseconds(256);
# ifdef ENABLE_ASYNC_EXPORT
processor_options.is_export_async = is_async;
# endif
auto provider = nostd::shared_ptr<sdk::logs::LoggerProvider>(new sdk::logs::LoggerProvider());
provider->AddProcessor(std::unique_ptr<sdk::logs::LogProcessor>(
new sdk::logs::BatchLogProcessor(std::move(exporter), processor_options)));
# endif

std::string report_trace_id;
std::string report_span_id;
Expand Down
Loading