Skip to content

Commit

Permalink
Fix address sanitizer of OtlpHttpClient and MultiLogProcessor
Browse files Browse the repository at this point in the history
Signed-off-by: owentou <owentou@tencent.com>
  • Loading branch information
owent committed Jan 4, 2022
1 parent 9665249 commit fed1f54
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 15 deletions.
16 changes: 13 additions & 3 deletions exporters/otlp/src/otlp_http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,18 @@ static void ConvertGenericMessageToJson(nlohmann::json &value,
}
}

static bool SerializeToHttpBody(http_client::Body &output, const google::protobuf::Message &message)
{
auto body_size = message.ByteSizeLong();
if (body_size > 0)
{
output.resize(body_size);
return message.SerializeWithCachedSizesToArray(
reinterpret_cast<google::protobuf::uint8 *>(&output[0]));
}
return true;
}

void ConvertGenericFieldToJson(nlohmann::json &value,
const google::protobuf::Message &message,
const google::protobuf::FieldDescriptor *field_descriptor,
Expand Down Expand Up @@ -600,9 +612,7 @@ opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export(
std::string content_type;
if (options_.content_type == HttpRequestContentType::kBinary)
{
body_vec.resize(message.ByteSizeLong());
if (message.SerializeWithCachedSizesToArray(
reinterpret_cast<google::protobuf::uint8 *>(&body_vec[0])))
if (SerializeToHttpBody(body_vec, message))
{
if (options_.console_debug)
{
Expand Down
56 changes: 44 additions & 12 deletions sdk/src/logs/multi_log_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,47 +68,79 @@ void MultiLogProcessor::OnReceive(std::unique_ptr<Recordable> &&record) noexcept

bool MultiLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
{
bool result = true;
auto start_time = std::chrono::system_clock::now();
auto expire_time = start_time + timeout;
// Converto nanos to prevent overflow
std::chrono::nanoseconds timeout_ns = std::chrono::nanoseconds::max();
if (std::chrono::duration_cast<std::chrono::microseconds>(timeout_ns) > timeout)
{
timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(timeout);
}
bool result = true;
auto start_time = std::chrono::system_clock::now();
auto overflow_checker = std::chrono::system_clock::time_point::max();
std::chrono::system_clock::time_point expire_time;
if (overflow_checker - start_time <= timeout_ns)
{
expire_time = overflow_checker;
}
else
{
expire_time =
start_time + std::chrono::duration_cast<std::chrono::system_clock::duration>(timeout_ns);
}
for (auto &processor : processors_)
{
if (!processor->ForceFlush(timeout))
if (!processor->ForceFlush(std::chrono::duration_cast<std::chrono::microseconds>(timeout_ns)))
{
result = false;
}
start_time = std::chrono::system_clock::now();
if (expire_time > start_time)
{
timeout = std::chrono::duration_cast<std::chrono::microseconds>(expire_time - start_time);
timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(expire_time - start_time);
}
else
{
timeout = std::chrono::microseconds::zero();
timeout_ns = std::chrono::nanoseconds::zero();
}
}
return result;
}

bool MultiLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
bool result = true;
auto start_time = std::chrono::system_clock::now();
auto expire_time = start_time + timeout;
// Converto nanos to prevent overflow
std::chrono::nanoseconds timeout_ns = std::chrono::nanoseconds::max();
if (std::chrono::duration_cast<std::chrono::microseconds>(timeout_ns) > timeout)
{
timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(timeout);
}
bool result = true;
auto start_time = std::chrono::system_clock::now();
auto overflow_checker = std::chrono::system_clock::time_point::max();
std::chrono::system_clock::time_point expire_time;
if (overflow_checker - start_time <= timeout_ns)
{
expire_time = overflow_checker;
}
else
{
expire_time =
start_time + std::chrono::duration_cast<std::chrono::system_clock::duration>(timeout_ns);
}
for (auto &processor : processors_)
{
if (!processor->Shutdown(timeout))
if (!processor->Shutdown(std::chrono::duration_cast<std::chrono::microseconds>(timeout_ns)))
{
result = false;
}
start_time = std::chrono::system_clock::now();
if (expire_time > start_time)
{
timeout = std::chrono::duration_cast<std::chrono::microseconds>(expire_time - start_time);
timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(expire_time - start_time);
}
else
{
timeout = std::chrono::microseconds::zero();
timeout_ns = std::chrono::nanoseconds::zero();
}
}
return result;
Expand Down

0 comments on commit fed1f54

Please sign in to comment.