diff --git a/server/src/config.rs b/server/src/config.rs index 763a738097..e1ac3a3c26 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -13,6 +13,7 @@ use serde_derive::Deserialize; use table_engine::ANALYTIC_ENGINE_TYPE; use crate::{ + http::DEFAULT_MAX_BODY_SIZE, limiter::LimiterConfig, route::rule_based::{ClusterView, RuleList}, }; @@ -176,6 +177,7 @@ pub struct Config { pub bind_addr: String, pub mysql_port: u16, pub http_port: u16, + pub http_max_body_size: u64, pub grpc_port: u16, pub grpc_server_cq_count: usize, @@ -226,6 +228,7 @@ impl Default for Config { Self { bind_addr: String::from("127.0.0.1"), http_port: 5000, + http_max_body_size: DEFAULT_MAX_BODY_SIZE, mysql_port: 3307, grpc_port, grpc_server_cq_count: 20, diff --git a/server/src/http.rs b/server/src/http.rs index 78a7ff6ebb..ca715f06f7 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -89,7 +89,7 @@ define_result!(Error); impl reject::Reject for Error {} -const MAX_BODY_SIZE: u64 = 4096; +pub const DEFAULT_MAX_BODY_SIZE: u64 = 64 * 1024; /// Http service /// @@ -100,6 +100,7 @@ pub struct Service { instance: InstanceRef, profiler: Arc, tx: Sender<()>, + config: HttpConfig, } impl Service { @@ -137,7 +138,7 @@ impl Service { warp::path!("sql") .and(warp::post()) - .and(warp::body::content_length_limit(MAX_BODY_SIZE)) + .and(warp::body::content_length_limit(self.config.max_body_size)) .and(extract_request) .and(self.with_context()) .and(self.with_instance()) @@ -339,16 +340,16 @@ impl Service { /// Service builder pub struct Builder { - endpoint: Endpoint, + config: HttpConfig, engine_runtimes: Option>, log_runtime: Option>, instance: Option>, } impl Builder { - pub fn new(endpoint: Endpoint) -> Self { + pub fn new(config: HttpConfig) -> Self { Self { - endpoint, + config, engine_runtimes: None, log_runtime: None, instance: None, @@ -385,18 +386,21 @@ impl Builder { instance, profiler: Arc::new(Profiler::default()), tx, + config: self.config.clone(), }; - let ip_addr: IpAddr = self.endpoint.addr.parse().context(ParseIpAddr { - ip: self.endpoint.addr, + let ip_addr: IpAddr = self.config.endpoint.addr.parse().context(ParseIpAddr { + ip: self.config.endpoint.addr, })?; // Register filters to warp and rejection handler let routes = service.routes().recover(handle_rejection); - let (_addr, server) = - warp::serve(routes).bind_with_graceful_shutdown((ip_addr, self.endpoint.port), async { + let (_addr, server) = warp::serve(routes).bind_with_graceful_shutdown( + (ip_addr, self.config.endpoint.port), + async { rx.await.ok(); - }); + }, + ); // Run the service engine_runtime.bg_runtime.spawn(server); @@ -404,6 +408,13 @@ impl Builder { } } +/// Http service config +#[derive(Debug, Clone)] +pub struct HttpConfig { + pub endpoint: Endpoint, + pub max_body_size: u64, +} + #[derive(Debug, Serialize)] struct ErrorResponse { code: u16, diff --git a/server/src/server.rs b/server/src/server.rs index dbd37ae30e..d9c4c70eec 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -17,7 +17,7 @@ use table_engine::engine::{EngineRuntimes, TableEngineRef}; use crate::{ config::{Config, Endpoint}, grpc::{self, RpcServices}, - http::{self, Service}, + http::{self, HttpConfig, Service}, instance::{Instance, InstanceRef}, limiter::Limiter, mysql, @@ -255,11 +255,16 @@ impl Builder { }; // Create http config - let http_config = Endpoint { + let endpoint = Endpoint { addr: self.config.bind_addr.clone(), port: self.config.http_port, }; + let http_config = HttpConfig { + endpoint, + max_body_size: self.config.http_max_body_size, + }; + // Start http service let engine_runtimes = self.engine_runtimes.context(MissingEngineRuntimes)?; let log_runtime = self.log_runtime.context(MissingLogRuntime)?;