Skip to content

Commit

Permalink
Fix timeout and concurrency layer (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
mi-yu authored Jan 3, 2024
1 parent b36b5fc commit 2dfbff6
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 2 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ tower-http = { version = "0.4.0", features = ["compression-br", "compression-gzi
rdkafka = { version = "0.33.2" }
axum-jrpc = { version = "0.5.1", features = ["serde_json", "anyhow_error"] }
ord-kafka-macros = { path = "ord-kafka-macros" }
tower = { version = "0.4.13", features = ["limit"] }


[dev-dependencies]
Expand Down
17 changes: 15 additions & 2 deletions src/subcommand/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use {
},
std::{cmp::Ordering, str, sync::Arc},
tokio_stream::StreamExt,
tower::limit::concurrency::ConcurrencyLimitLayer,
tower_http::{
compression::CompressionLayer,
cors::{Any, CorsLayer},
Expand Down Expand Up @@ -172,11 +173,17 @@ pub(crate) struct Server {
help = "Timeout requests after <SECONDS> seconds. Default: 30 seconds."
)]
timeout: Option<u64>,
#[clap(long, help = "Set max concurrent connections. Default: 1024")]
max_connections: Option<usize>,
}

impl Server {
pub(crate) fn run(self, options: Options, index: Arc<Index>, handle: Handle) -> SubcommandResult {
Runtime::new()?.block_on(async {
log::debug!(
"Starting server with {} max connections",
self.max_connections.unwrap_or(1024)
);
let index_clone = index.clone();

let index_thread = thread::spawn(move || loop {
Expand Down Expand Up @@ -276,7 +283,14 @@ impl Server {
.route("/tx/:txid", get(Self::transaction))

// API routes
.route("/rpc/v1", post(rpc::handler))
.route("/rpc/v1", post(rpc::handler)
.route_layer(TimeoutLayer::new(Duration::from_secs(self.timeout.unwrap_or(30))))
.route_layer(
ConcurrencyLimitLayer::new(
self.max_connections.unwrap_or(1024),
)
)
)
.layer(axum::middleware::from_fn(middleware::tracing_layer))
.layer(Extension(index))
.layer(Extension(page_config))
Expand All @@ -295,7 +309,6 @@ impl Server {
.allow_origin(Any),
)
.layer(CompressionLayer::new())
.layer(TimeoutLayer::new(Duration::from_secs(self.timeout.unwrap_or(30))))
.with_state(server_config);

match (self.http_port(), self.https_port()) {
Expand Down

0 comments on commit 2dfbff6

Please sign in to comment.