Skip to content

Commit

Permalink
feat(subscriber): support grpc-web and add grpc-web feature (#498)
Browse files Browse the repository at this point in the history
## Description

This pull request adds support for `grpc-web` to `console-subscriber`.
Once you enable this feature by calling the `enable_grpc_web` function,
you can connect the console-subscriber gRPC server using a browser
client.

## Explanation of Changes

1. Added a new feature called `grpc-web` which requires the `tonic-web`
   crate as a dependency.
2. A new API named `serve_with_grpc_web` has been introduced. It appears
   to be similar to the `serve_with` API. However, if we were to use the
   same API with `serve_with`, it would result in a bound issue. We
   attempted to combine `serve_with_grpc_web` and `serve_with`, but it
   would create a very complex trait bound for the function. Therefore,
   we decided to introduce a new API to address this problem.
4. Added a new example named `grpc_web` to show how to use the
   `into_parts` API to customize the CORS layer.

Ref #497

Signed-off-by: hi-rustin <rustin.liu@gmail.com>
Co-authored-by: Hayden Stainsby <hds@caffeineconcepts.com>
Rustin170506 and hds authored Feb 16, 2024
1 parent 28a27fc commit 4150253
Showing 5 changed files with 345 additions and 2 deletions.
47 changes: 47 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
@@ -28,9 +28,9 @@ keywords = [
default = ["env-filter"]
parking_lot = ["parking_lot_crate", "tracing-subscriber/parking_lot"]
env-filter = ["tracing-subscriber/env-filter"]
grpc-web = ["tonic-web"]

[dependencies]

crossbeam-utils = "0.8.7"
tokio = { version = "^1.21", features = ["sync", "time", "macros", "tracing"] }
tokio-stream = { version = "0.1", features = ["net"] }
@@ -54,11 +54,20 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
crossbeam-channel = "0.5"

# Only for the web feature:
tonic-web = { version = "0.10.2", optional = true }

[dev-dependencies]
tokio = { version = "^1.21", features = ["full", "rt-multi-thread"] }
tower = { version = "0.4", default-features = false }
futures = "0.3"
http = "0.2"
tower-http = { version = "0.4", features = ["cors"] }

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[[example]]
name = "grpc_web"
required-features = ["grpc-web"]
122 changes: 122 additions & 0 deletions console-subscriber/examples/grpc_web.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
//! Example of using the console subscriber with tonic-web.
//! This example requires the `grpc-web` feature to be enabled.
//! Run with:
//! ```sh
//! cargo run --example grpc_web --features grpc-web
//! ```
use std::{thread, time::Duration};

use console_subscriber::{ConsoleLayer, ServerParts};
use http::header::HeaderName;
use tonic_web::GrpcWebLayer;
use tower_http::cors::{AllowOrigin, CorsLayer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60);
const DEFAULT_EXPOSED_HEADERS: [&str; 3] =
["grpc-status", "grpc-message", "grpc-status-details-bin"];
const DEFAULT_ALLOW_HEADERS: [&str; 5] = [
"x-grpc-web",
"content-type",
"x-user-agent",
"grpc-timeout",
"user-agent",
];

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let (console_layer, server) = ConsoleLayer::builder().with_default_env().build();
thread::Builder::new()
.name("subscriber".into())
.spawn(move || {
// Do not trace anything in this thread.
let _subscriber_guard =
tracing::subscriber::set_default(tracing_core::subscriber::NoSubscriber::default());
// Custom CORS configuration.
let cors = CorsLayer::new()
.allow_origin(AllowOrigin::mirror_request())
.allow_credentials(true)
.max_age(DEFAULT_MAX_AGE)
.expose_headers(
DEFAULT_EXPOSED_HEADERS
.iter()
.cloned()
.map(HeaderName::from_static)
.collect::<Vec<HeaderName>>(),
)
.allow_headers(
DEFAULT_ALLOW_HEADERS
.iter()
.cloned()
.map(HeaderName::from_static)
.collect::<Vec<HeaderName>>(),
);
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("console subscriber runtime initialization failed");
runtime.block_on(async move {
let ServerParts {
instrument_server,
aggregator,
..
} = server.into_parts();
tokio::spawn(aggregator.run());
let router = tonic::transport::Server::builder()
// Accept gRPC-Web requests and enable CORS.
.accept_http1(true)
.layer(cors)
.layer(GrpcWebLayer::new())
.add_service(instrument_server);
let serve = router.serve(std::net::SocketAddr::new(
std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
// 6669 is a restricted port on Chrome, so we cannot use it. We use a different port instead.
9999,
));
serve.await.expect("console subscriber server failed");
});
})
.expect("console subscriber could not spawn thread");
tracing_subscriber::registry().with(console_layer).init();

let task1 = tokio::task::Builder::new()
.name("task1")
.spawn(spawn_tasks(1, 10))
.unwrap();
let task2 = tokio::task::Builder::new()
.name("task2")
.spawn(spawn_tasks(10, 30))
.unwrap();

let result = tokio::try_join! {
task1,
task2,
};
result?;

Ok(())
}

#[tracing::instrument]
async fn spawn_tasks(min: u64, max: u64) {
loop {
for i in min..max {
tracing::trace!(i, "spawning wait task");
tokio::task::Builder::new()
.name("wait")
.spawn(wait(i))
.unwrap();

let sleep = Duration::from_secs(max) - Duration::from_secs(i);
tracing::trace!(?sleep, "sleeping...");
tokio::time::sleep(sleep).await;
}
}
}

#[tracing::instrument]
async fn wait(seconds: u64) {
tracing::debug!("waiting...");
tokio::time::sleep(Duration::from_secs(seconds)).await;
tracing::trace!("done!");
}
40 changes: 39 additions & 1 deletion console-subscriber/src/builder.rs
Original file line number Diff line number Diff line change
@@ -56,6 +56,10 @@ pub struct Builder {
/// Any scheduled times exceeding this duration will be clamped to this
/// value. Higher values will result in more memory usage.
pub(super) scheduled_duration_max: Duration,

/// Whether to enable the grpc-web support.
#[cfg(feature = "grpc-web")]
enable_grpc_web: bool,
}

impl Default for Builder {
@@ -71,6 +75,8 @@ impl Default for Builder {
recording_path: None,
filter_env_var: "RUST_LOG".to_string(),
self_trace: false,
#[cfg(feature = "grpc-web")]
enable_grpc_web: false,
}
}
}
@@ -268,6 +274,28 @@ impl Builder {
Self { self_trace, ..self }
}

/// Sets whether to enable the grpc-web support.
///
/// By default, this is `false`. If enabled, the console subscriber will
/// serve the gRPC-Web protocol in addition to the standard gRPC protocol.
/// This is useful for serving the console subscriber to web clients.
/// Please be aware that the current default server port is set to 6669.
/// However, certain browsers may restrict this port due to security reasons.
/// If you encounter issues with this, consider changing the port to an
/// alternative one that is not commonly blocked by browsers.
///
/// [`serve_with_grpc_web`] is used to provide more advanced configuration
/// for the gRPC-Web server.
///
/// [`serve_with_grpc_web`]: crate::Server::serve_with_grpc_web
#[cfg(feature = "grpc-web")]
pub fn enable_grpc_web(self, enable_grpc_web: bool) -> Self {
Self {
enable_grpc_web,
..self
}
}

/// Completes the builder, returning a [`ConsoleLayer`] and [`Server`] task.
pub fn build(self) -> (ConsoleLayer, Server) {
ConsoleLayer::build(self)
@@ -481,6 +509,8 @@ impl Builder {
}

let self_trace = self.self_trace;
#[cfg(feature = "grpc-web")]
let enable_grpc_web = self.enable_grpc_web;

let (layer, server) = self.build();
let filter =
@@ -501,8 +531,16 @@ impl Builder {
.enable_time()
.build()
.expect("console subscriber runtime initialization failed");

runtime.block_on(async move {
#[cfg(feature = "grpc-web")]
if enable_grpc_web {
server
.serve_with_grpc_web(tonic::transport::Server::builder())
.await
.expect("console subscriber server failed");
return;
}

server
.serve()
.await
127 changes: 127 additions & 0 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
@@ -962,6 +962,133 @@ impl Server {
res?.map_err(Into::into)
}

/// Starts the gRPC service with the default gRPC settings and gRPC-Web
/// support.
///
/// # Examples
///
/// To serve the instrument server with gRPC-Web support with the default
/// settings:
///
/// ```rust
/// # async fn docs() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
/// # let (_, server) = console_subscriber::ConsoleLayer::new();
/// server.serve_with_grpc_web(tonic::transport::Server::default()).await
/// # }
/// ```
///
/// To serve the instrument server with gRPC-Web support and a custom CORS configuration, use the
/// following code:
///
/// ```rust
/// # use std::{thread, time::Duration};
/// #
/// use console_subscriber::{ConsoleLayer, ServerParts};
/// use tonic_web::GrpcWebLayer;
/// use tower_web::cors::{CorsLayer, AllowOrigin};
/// use http::header::HeaderName;
/// # use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
/// # const DEFAULT_MAX_AGE: Duration = Duration::from_secs(24 * 60 * 60);
/// # const DEFAULT_EXPOSED_HEADERS: [&str; 3] =
/// # ["grpc-status", "grpc-message", "grpc-status-details-bin"];
/// # const DEFAULT_ALLOW_HEADERS: [&str; 5] = [
/// # "x-grpc-web",
/// # "content-type",
/// # "x-user-agent",
/// # "grpc-timeout",
/// # "user-agent",
/// # ];
///
/// let (console_layer, server) = ConsoleLayer::builder().with_default_env().build();
/// # thread::Builder::new()
/// # .name("subscriber".into())
/// # .spawn(move || {
/// // Customize the CORS configuration.
/// let cors = CorsLayer::new()
/// .allow_origin(AllowOrigin::mirror_request())
/// .allow_credentials(true)
/// .max_age(DEFAULT_MAX_AGE)
/// .expose_headers(
/// DEFAULT_EXPOSED_HEADERS
/// .iter()
/// .cloned()
/// .map(HeaderName::from_static)
/// .collect::<Vec<HeaderName>>(),
/// )
/// .allow_headers(
/// DEFAULT_ALLOW_HEADERS
/// .iter()
/// .cloned()
/// .map(HeaderName::from_static)
/// .collect::<Vec<HeaderName>>(),
/// );
/// # let runtime = tokio::runtime::Builder::new_current_thread()
/// # .enable_all()
/// # .build()
/// # .expect("console subscriber runtime initialization failed");
/// # runtime.block_on(async move {
///
/// let ServerParts {
/// instrument_server,
/// aggregator,
/// ..
/// } = server.into_parts();
/// tokio::spawn(aggregator.run());
///
/// // Serve the instrument server with gRPC-Web support and the CORS configuration.
/// let router = tonic::transport::Server::builder()
/// .accept_http1(true)
/// .layer(cors)
/// .layer(GrpcWebLayer::new())
/// .add_service(instrument_server);
/// let serve = router.serve(std::net::SocketAddr::new(
/// std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)),
/// // 6669 is a restricted port on Chrome, so we cannot use it. We use a different port instead.
/// 9999,
/// ));
///
/// // Finally, spawn the server.
/// serve.await.expect("console subscriber server failed");
/// # });
/// # })
/// # .expect("console subscriber could not spawn thread");
/// # tracing_subscriber::registry().with(console_layer).init();
/// ```
///
/// For a comprehensive understanding and complete code example,
/// please refer to the `grpc-web` example in the examples directory.
///
/// [`Router::serve`]: fn@tonic::transport::server::Router::serve
#[cfg(feature = "grpc-web")]
pub async fn serve_with_grpc_web(
self,
builder: tonic::transport::Server,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let addr = self.addr.clone();
let ServerParts {
instrument_server,
aggregator,
} = self.into_parts();
let router = builder
.accept_http1(true)
.add_service(tonic_web::enable(instrument_server));
let aggregate = spawn_named(aggregator.run(), "console::aggregate");
let res = match addr {
ServerAddr::Tcp(addr) => {
let serve = router.serve(addr);
spawn_named(serve, "console::serve").await
}
#[cfg(unix)]
ServerAddr::Unix(path) => {
let incoming = UnixListener::bind(path)?;
let serve = router.serve_with_incoming(UnixListenerStream::new(incoming));
spawn_named(serve, "console::serve").await
}
};
aggregate.abort();
res?.map_err(Into::into)
}

/// Returns the parts needed to spawn a gRPC server and the aggregator that
/// supplies it.
///

0 comments on commit 4150253

Please sign in to comment.