Skip to content

Commit

Permalink
stackdriver: some collected fixes for 0.13 release (#722)
Browse files Browse the repository at this point in the history
  • Loading branch information
djc authored Feb 7, 2022
1 parent 28d13fb commit 90f6839
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 44 deletions.
6 changes: 3 additions & 3 deletions opentelemetry-stackdriver/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "opentelemetry-stackdriver"
version = "0.13.0"
version = "0.14.0"
description = "A Rust opentelemetry exporter that uploads traces to Google Stackdriver trace."
documentation = "https://docs.rs/opentelemetry-stackdriver/"
repository = "https://github.com/open-telemetry/opentelemetry-rust"
Expand All @@ -10,8 +10,8 @@ exclude = ["/proto"]

[dependencies]
async-trait = "0.1.48"
futures = { version = "0.3", default-features = false }
gcp_auth = { version = "0.6", optional = true }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
gcp_auth = { version = "0.7", optional = true }
hex = "0.4"
http = "0.2"
hyper = "0.14.2"
Expand Down
190 changes: 149 additions & 41 deletions opentelemetry-stackdriver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use yup_oauth2::authenticator::Authenticator;

pub mod proto;

use proto::api::MonitoredResource;
use proto::devtools::cloudtrace::v2::BatchWriteSpansRequest;
use proto::devtools::cloudtrace::v2::{
span::{time_event::Annotation, Attributes, TimeEvent, TimeEvents},
Expand Down Expand Up @@ -158,21 +157,30 @@ impl Builder {
let uri = http::uri::Uri::from_static("https://cloudtrace.googleapis.com:443");

let trace_channel = Channel::builder(uri)
.tls_config(ClientTlsConfig::new())?
.tls_config(ClientTlsConfig::new())
.map_err(|e| Error::Transport(e.into()))?
.connect()
.await?;

let log_channel = Channel::builder(http::uri::Uri::from_static(
"https://logging.googleapis.com:443",
))
.tls_config(ClientTlsConfig::new())?
.connect()
.await?;

let log_client = log_context.map(|log_context| LogClient {
client: LoggingServiceV2Client::new(log_channel),
context: Arc::new(log_context),
});
.await
.map_err(|e| Error::Transport(e.into()))?;

let log_client = match log_context {
Some(log_context) => {
let log_channel = Channel::builder(http::uri::Uri::from_static(
"https://logging.googleapis.com:443",
))
.tls_config(ClientTlsConfig::new())
.map_err(|e| Error::Transport(e.into()))?
.connect()
.await
.map_err(|e| Error::Transport(e.into()))?;

Some(LogClient {
client: LoggingServiceV2Client::new(log_channel),
context: Arc::new(InternalLogContext::from(log_context)),
})
}
None => None,
};

let (tx, rx) = futures::channel::mpsc::channel(64);
let pending_count = Arc::new(AtomicUsize::new(0));
Expand Down Expand Up @@ -280,10 +288,7 @@ where
self.authorizer.project_id(),
client.context.log_id,
),
resource: Some(MonitoredResource {
r#type: client.context.resource.r#type.clone(),
labels: client.context.resource.labels.clone(),
}),
resource: Some(client.context.resource.clone()),
severity: level as i32,
timestamp: Some(event.timestamp.into()),
labels,
Expand Down Expand Up @@ -335,9 +340,9 @@ where

self.pending_count.fetch_sub(1, Ordering::Relaxed);
if let Err(e) = self.authorizer.authorize(&mut req, &self.scopes).await {
handle_error(TraceError::from(Error::from(e)));
handle_error(TraceError::from(Error::Authorizer(e.into())));
} else if let Err(e) = self.trace_client.batch_write_spans(req).await {
handle_error(TraceError::from(Error::TonicRpc(e)));
handle_error(TraceError::from(Error::Transport(e.into())));
}

let client = match &mut self.log_client {
Expand All @@ -361,7 +366,7 @@ where
if let Err(e) = self.authorizer.authorize(&mut req, &self.scopes).await {
handle_error(TraceError::from(Error::from(e)));
} else if let Err(e) = client.client.write_log_entries(req).await {
handle_error(TraceError::from(Error::TonicRpc(e)));
handle_error(TraceError::from(Error::Transport(e.into())));
}
}
}
Expand Down Expand Up @@ -411,7 +416,12 @@ impl Authorizer for YupAuthorizer {
req: &mut Request<T>,
scopes: &[&str],
) -> Result<(), Self::Error> {
let token = self.authenticator.token(scopes).await?;
let token = self
.authenticator
.token(scopes)
.await
.map_err(|e| Error::Authorizer(e.into()))?;

req.metadata_mut().insert(
"authorization",
MetadataValue::from_str(&format!("Bearer {}", token.as_str())).unwrap(),
Expand All @@ -428,9 +438,16 @@ pub struct GcpAuthorizer {

#[cfg(feature = "gcp_auth")]
impl GcpAuthorizer {
pub async fn new() -> Result<Self, gcp_auth::Error> {
let manager = gcp_auth::init().await?;
let project_id = manager.project_id().await?;
pub async fn new() -> Result<Self, Error> {
let manager = gcp_auth::AuthenticationManager::new()
.await
.map_err(|e| Error::Authorizer(e.into()))?;

let project_id = manager
.project_id()
.await
.map_err(|e| Error::Authorizer(e.into()))?;

Ok(Self {
manager,
project_id,
Expand All @@ -452,18 +469,24 @@ impl Authorizer for GcpAuthorizer {
req: &mut Request<T>,
scopes: &[&str],
) -> Result<(), Self::Error> {
let token = self.manager.get_token(scopes).await?;
let token = self
.manager
.get_token(scopes)
.await
.map_err(|e| Error::Authorizer(e.into()))?;

req.metadata_mut().insert(
"authorization",
MetadataValue::from_str(&format!("Bearer {}", token.as_str())).unwrap(),
);

Ok(())
}
}

#[async_trait]
pub trait Authorizer: Sync + Send + 'static {
type Error: fmt::Display + fmt::Debug + Send;
type Error: std::error::Error + fmt::Debug + Send + Sync;

fn project_id(&self) -> &str;
async fn authorize<T: Send + Sync>(
Expand Down Expand Up @@ -498,20 +521,14 @@ fn to_truncate(s: String) -> TruncatableString {

#[derive(Debug, Error)]
pub enum Error {
#[cfg(feature = "gcp_auth")]
#[error("authorizer error: {0}")]
Gcp(#[from] gcp_auth::Error),
Authorizer(#[source] Box<dyn std::error::Error + Send + Sync>),
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("{0}")]
Other(#[from] Box<dyn std::error::Error + Send + Sync>),
#[error("tonic error: {0}")]
TonicRpc(#[from] tonic::Status),
#[error("tonic error: {0}")]
TonicTransport(#[from] tonic::transport::Error),
#[cfg(feature = "yup-oauth2")]
#[error("authorizer error: {0}")]
Yup(#[from] yup_oauth2::Error),
Transport(#[source] Box<dyn std::error::Error + Send + Sync>),
}

impl ExportError for Error {
Expand All @@ -532,19 +549,110 @@ enum LogSeverity {
#[derive(Clone)]
struct LogClient {
client: LoggingServiceV2Client<Channel>,
context: Arc<LogContext>,
context: Arc<InternalLogContext>,
}

struct InternalLogContext {
log_id: String,
resource: proto::api::MonitoredResource,
}

#[derive(Clone)]
pub struct LogContext {
pub log_id: String,
pub resource: Resource,
pub resource: MonitoredResource,
}

impl From<LogContext> for InternalLogContext {
fn from(cx: LogContext) -> Self {
let mut labels = HashMap::default();
let resource = match cx.resource {
MonitoredResource::GenericNode {
project_id,
location,
namespace,
node_id,
} => {
labels.insert("project_id".to_string(), project_id);
if let Some(location) = location {
labels.insert("location".to_string(), location);
}
if let Some(namespace) = namespace {
labels.insert("namespace".to_string(), namespace);
}
if let Some(node_id) = node_id {
labels.insert("node_id".to_string(), node_id);
}

proto::api::MonitoredResource {
r#type: "generic_node".to_owned(),
labels,
}
}
MonitoredResource::GenericTask {
project_id,
location,
namespace,
job,
task_id,
} => {
labels.insert("project_id".to_owned(), project_id);
if let Some(location) = location {
labels.insert("location".to_owned(), location);
}
if let Some(namespace) = namespace {
labels.insert("namespace".to_owned(), namespace);
}
if let Some(job) = job {
labels.insert("job".to_owned(), job);
}
if let Some(task_id) = task_id {
labels.insert("task_id".to_owned(), task_id);
}

proto::api::MonitoredResource {
r#type: "generic_task".to_owned(),
labels,
}
}
MonitoredResource::Global { project_id } => {
labels.insert("project_id".to_owned(), project_id);
proto::api::MonitoredResource {
r#type: "global".to_owned(),
labels,
}
}
};

Self {
log_id: cx.log_id,
resource,
}
}
}

/// A description of a `MonitoredResource`.
///
/// Possible values are listed in the [API documentation](https://cloud.google.com/logging/docs/api/v2/resource-list).
/// Please submit an issue or pull request if you want to use a resource type not listed here.
#[derive(Clone)]
pub struct Resource {
pub r#type: String,
pub labels: HashMap<String, String>,
pub enum MonitoredResource {
Global {
project_id: String,
},
GenericNode {
project_id: String,
location: Option<String>,
namespace: Option<String>,
node_id: Option<String>,
},
GenericTask {
project_id: String,
location: Option<String>,
namespace: Option<String>,
job: Option<String>,
task_id: Option<String>,
},
}

const TRACE_APPEND: &str = "https://www.googleapis.com/auth/trace.append";
Expand Down

0 comments on commit 90f6839

Please sign in to comment.