Skip to content

Commit

Permalink
Cocurrency otlp http session (#1274)
Browse files Browse the repository at this point in the history
  • Loading branch information
owent authored Mar 21, 2022
1 parent 31d888a commit c3eaa9d
Show file tree
Hide file tree
Showing 17 changed files with 662 additions and 249 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Increment the:
## [Unreleased]

* [EXPORTER] Jaeger Exporter - Populate Span Links ([#1251](https://github.com/open-telemetry/opentelemetry-cpp/pull/1251))
* [EXPORTER] OTLP http exporter allow concurrency session ([#1209](https://github.com/open-telemetry/opentelemetry-cpp/pull/1209))

## [1.2.0] 2022-01-31

Expand Down
34 changes: 34 additions & 0 deletions api/include/opentelemetry/common/timestamp.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,5 +169,39 @@ class SteadyTimestamp
private:
int64_t nanos_since_epoch_;
};

class DurationUtil
{
public:
template <class Rep, class Period>
static std::chrono::duration<Rep, Period> AdjustWaitForTimeout(
std::chrono::duration<Rep, Period> timeout,
std::chrono::duration<Rep, Period> indefinite_value) noexcept
{
// Do not call now() when this duration is max value, now() may have a expensive cost.
if (timeout == std::chrono::duration<Rep, Period>::max())
{
return indefinite_value;
}

// std::future<T>::wait_for, std::this_thread::sleep_for, and std::condition_variable::wait_for
// may use steady_clock or system_clock.We need make sure now() + timeout do not overflow.
auto max_timeout = std::chrono::duration_cast<std::chrono::duration<Rep, Period>>(
std::chrono::steady_clock::time_point::max() - std::chrono::steady_clock::now());
if (timeout >= max_timeout)
{
return indefinite_value;
}
max_timeout = std::chrono::duration_cast<std::chrono::duration<Rep, Period>>(
std::chrono::system_clock::time_point::max() - std::chrono::system_clock::now());
if (timeout >= max_timeout)
{
return indefinite_value;
}

This comment has been minimized.

Copy link
@DebajitDas

DebajitDas Mar 23, 2022

Member

@owent Am I missing something here? This code (195- 200) is repeated

This comment has been minimized.

Copy link
@owent

owent Mar 23, 2022

Author Member

189-195 check steady_clock while 195- 200 check system_clock

This comment has been minimized.

Copy link
@DebajitDas

DebajitDas Mar 23, 2022

Member

ohh thanks


return timeout;
}
};

} // namespace common
OPENTELEMETRY_END_NAMESPACE
2 changes: 1 addition & 1 deletion exporters/ostream/src/log_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ sdk::common::ExportResult OStreamLogExporter::Export(
return sdk::common::ExportResult::kSuccess;
}

bool OStreamLogExporter::Shutdown(std::chrono::microseconds timeout) noexcept
bool OStreamLogExporter::Shutdown(std::chrono::microseconds) noexcept
{
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
is_shutdown_ = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@

#include "opentelemetry/exporters/otlp/otlp_environment.h"

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <list>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace exporter
Expand Down Expand Up @@ -71,20 +76,25 @@ struct OtlpHttpClientOptions
// Additional HTTP headers
OtlpHeaders http_headers = GetOtlpDefaultHeaders();

// Concurrent sessions
std::size_t concurrent_sessions = 8;

inline OtlpHttpClientOptions(nostd::string_view input_url,
HttpRequestContentType input_content_type,
JsonBytesMappingKind input_json_bytes_mapping,
bool input_use_json_name,
bool input_console_debug,
std::chrono::system_clock::duration input_timeout,
const OtlpHeaders &input_http_headers)
const OtlpHeaders &input_http_headers,
std::size_t input_concurrent_sessions = 8)
: url(input_url),
content_type(input_content_type),
json_bytes_mapping(input_json_bytes_mapping),
use_json_name(input_use_json_name),
console_debug(input_console_debug),
timeout(input_timeout),
http_headers(input_http_headers)
http_headers(input_http_headers),
concurrent_sessions(input_concurrent_sessions)
{}
};

Expand All @@ -99,6 +109,8 @@ class OtlpHttpClient
*/
explicit OtlpHttpClient(OtlpHttpClientOptions &&options);

~OtlpHttpClient();

/**
* Export
* @param message message to export, it should be ExportTraceServiceRequest,
Expand All @@ -114,19 +126,33 @@ class OtlpHttpClient
*/
bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept;

/**
* @brief Release the lifetime of specify session.
*
* @param session the session to release
*/
void ReleaseSession(const opentelemetry::ext::http::client::Session &session) noexcept;

private:
// Stores if this HTTP client had its Shutdown() method called
bool is_shutdown_ = false;
/**
* Add http session and hold it's lifetime.
* @param session the session to add
* @param event_handle the event handle of this session
*/
void addSession(
std::shared_ptr<opentelemetry::ext::http::client::Session> session,
std::unique_ptr<opentelemetry::ext::http::client::EventHandler> &&event_handle) noexcept;

// The configuration options associated with this HTTP client.
const OtlpHttpClientOptions options_;
/**
* @brief Real delete all sessions and event handles.
* @note This function is called in the same thread where we create sessions and handles
*
* @return return true if there are more sessions to delete
*/
bool cleanupGCSessions() noexcept;

// Object that stores the HTTP sessions that have been created
std::shared_ptr<ext::http::client::HttpClient> http_client_;
// Cached parsed URI
std::string http_uri_;
mutable opentelemetry::common::SpinLockMutex lock_;
bool isShutdown() const noexcept;

// For testing
friend class OtlpHttpExporterTestPeer;
friend class OtlpHttpLogExporterTestPeer;
Expand All @@ -138,6 +164,51 @@ class OtlpHttpClient
*/
OtlpHttpClient(OtlpHttpClientOptions &&options,
std::shared_ptr<ext::http::client::HttpClient> http_client);

struct HttpSessionData
{
std::shared_ptr<opentelemetry::ext::http::client::Session> session;
std::unique_ptr<opentelemetry::ext::http::client::EventHandler> event_handle;

inline HttpSessionData() = default;

inline explicit HttpSessionData(
std::shared_ptr<opentelemetry::ext::http::client::Session> &&input_session,
std::unique_ptr<opentelemetry::ext::http::client::EventHandler> &&input_handle)
{
session.swap(input_session);
event_handle.swap(input_handle);
}

inline explicit HttpSessionData(HttpSessionData &&other)
{
session.swap(other.session);
event_handle.swap(other.event_handle);
}
};

// Stores if this HTTP client had its Shutdown() method called
bool is_shutdown_;

// The configuration options associated with this HTTP client.
const OtlpHttpClientOptions options_;

// Object that stores the HTTP sessions that have been created
std::shared_ptr<ext::http::client::HttpClient> http_client_;

// Cached parsed URI
std::string http_uri_;

// Running sessions and event handles
std::unordered_map<const opentelemetry::ext::http::client::Session *, HttpSessionData>
running_sessions_;
// Sessions and event handles that are waiting to be deleted
std::list<HttpSessionData> gc_sessions_;
// Lock for running_sessions_, gc_sessions_ and http_client_
std::recursive_mutex session_manager_lock_;
// Condition variable and mutex to control the concurrency count of running sessions
std::mutex session_waker_lock_;
std::condition_variable session_waker_;
};
} // namespace otlp
} // namespace exporter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "opentelemetry/exporters/otlp/otlp_environment.h"

#include <chrono>
#include <cstddef>
#include <memory>
#include <string>

Expand Down Expand Up @@ -50,6 +51,9 @@ struct OtlpHttpExporterOptions

// Additional HTTP headers
OtlpHeaders http_headers = GetOtlpDefaultHeaders();

// Concurrent sessions
std::size_t concurrent_sessions = 8;
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# include "opentelemetry/exporters/otlp/otlp_environment.h"

# include <chrono>
# include <cstddef>
# include <memory>
# include <string>

Expand Down Expand Up @@ -50,6 +51,9 @@ struct OtlpHttpLogExporterOptions

// Additional HTTP headers
OtlpHeaders http_headers = GetOtlpDefaultLogHeaders();

// Concurrent sessions
std::size_t concurrent_sessions = 8;
};

/**
Expand Down
Loading

0 comments on commit c3eaa9d

Please sign in to comment.