Skip to content

Commit

Permalink
Add parameter to configure content limit for ingest api
Browse files Browse the repository at this point in the history
  • Loading branch information
kamalesh0406 committed Oct 6, 2023
1 parent b105864 commit 4bfb898
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 16 deletions.
4 changes: 2 additions & 2 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ pub struct IngestApiConfig {
pub max_queue_memory_usage: Byte,
pub max_queue_disk_usage: Byte,
pub replication_factor: usize,
pub content_length_limit: u64
pub content_length_limit: u64,
}

impl Default for IngestApiConfig {
Expand All @@ -187,7 +187,7 @@ impl Default for IngestApiConfig {
max_queue_memory_usage: Byte::from_bytes(2 * 1024 * 1024 * 1024), /* 2 GiB // TODO maybe we want more? */
max_queue_disk_usage: Byte::from_bytes(4 * 1024 * 1024 * 1024), /* 4 GiB // TODO maybe we want more? */
replication_factor: 1,
content_length_limit: 10 * 1024 * 1024 /* 10 MB */
content_length_limit: 10 * 1024 * 1024, // 10 MB
}
}
}
Expand Down
63 changes: 50 additions & 13 deletions quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use bytes::{Buf, BufMut, Bytes, BytesMut};
use quickwit_config::{INGEST_SOURCE_ID, IngestApiConfig};
use quickwit_config::{IngestApiConfig, INGEST_SOURCE_ID};
use quickwit_ingest::{
CommitType, DocBatchBuilder, FetchResponse, IngestRequest, IngestResponse, IngestService,
IngestServiceClient, IngestServiceError, TailRequest,
Expand Down Expand Up @@ -66,19 +66,21 @@ struct IngestOptions {
pub(crate) fn ingest_api_handlers(
ingest_router: IngestRouterServiceClient,
ingest_service: IngestServiceClient,
config: IngestApiConfig
config: IngestApiConfig,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
ingest_handler(ingest_service.clone(), config.clone())
.or(tail_handler(ingest_service))
.or(ingest_v2_handler(ingest_router, config))
}

fn ingest_filter(
config: IngestApiConfig
config: IngestApiConfig,
) -> impl Filter<Extract = (String, Bytes, IngestOptions), Error = Rejection> + Clone {
warp::path!(String / "ingest")
.and(warp::post())
.and(warp::body::content_length_limit(config.content_length_limit))
.and(warp::body::content_length_limit(
config.content_length_limit,
))
.and(warp::body::bytes())
.and(serde_qs::warp::query::<IngestOptions>(
serde_qs::Config::default(),
Expand All @@ -87,7 +89,7 @@ fn ingest_filter(

fn ingest_handler(
ingest_service: IngestServiceClient,
config: IngestApiConfig
config: IngestApiConfig,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
ingest_filter(config)
.and(with_arg(ingest_service))
Expand All @@ -96,11 +98,13 @@ fn ingest_handler(
}

fn ingest_v2_filter(
config: IngestApiConfig
config: IngestApiConfig,
) -> impl Filter<Extract = (String, Bytes, IngestOptions), Error = Rejection> + Clone {
warp::path!(String / "ingest-v2")
.and(warp::post())
.and(warp::body::content_length_limit(config.content_length_limit))
.and(warp::body::content_length_limit(
config.content_length_limit,
))
.and(warp::body::bytes())
.and(serde_qs::warp::query::<IngestOptions>(
serde_qs::Config::default(),
Expand All @@ -109,7 +113,7 @@ fn ingest_v2_filter(

fn ingest_v2_handler(
ingest_router: IngestRouterServiceClient,
config: IngestApiConfig
config: IngestApiConfig,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
ingest_v2_filter(config)
.and(with_arg(ingest_router))
Expand Down Expand Up @@ -270,7 +274,8 @@ pub(crate) mod tests {
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::mock().into();
let ingest_api_handlers = ingest_api_handlers(ingest_router,ingest_service, IngestApiConfig::default());
let ingest_api_handlers =
ingest_api_handlers(ingest_router, ingest_service, IngestApiConfig::default());
let resp = warp::test::request()
.path("/my-index/ingest")
.method("POST")
Expand Down Expand Up @@ -305,7 +310,8 @@ pub(crate) mod tests {
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::mock().into();
let ingest_api_handlers = ingest_api_handlers(ingest_router, ingest_service, IngestApiConfig::default());
let ingest_api_handlers =
ingest_api_handlers(ingest_router, ingest_service, IngestApiConfig::default());
let payload = r#"
{"id": 1, "message": "push"}
{"id": 2, "message": "push"}
Expand All @@ -332,7 +338,8 @@ pub(crate) mod tests {
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index"], &config).await;
let ingest_router = IngestRouterServiceClient::mock().into();
let ingest_api_handlers = ingest_api_handlers(ingest_router, ingest_service, IngestApiConfig::default());
let ingest_api_handlers =
ingest_api_handlers(ingest_router, ingest_service, IngestApiConfig::default());
let resp = warp::test::request()
.path("/my-index/ingest")
.method("POST")
Expand All @@ -344,12 +351,38 @@ pub(crate) mod tests {
universe.assert_quit().await;
}

#[tokio::test]
async fn test_ingest_api_return_413_if_above_content_limit() {
let config = IngestApiConfig {
content_length_limit: 1,
..Default::default()
};
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::mock().into();
let ingest_api_handlers =
ingest_api_handlers(ingest_router, ingest_service, config.clone());
let resp = warp::test::request()
.path("/my-index/ingest")
.method("POST")
.json(&true)
.body(r#"{"id": 1, "message": "push"}"#)
.reply(&ingest_api_handlers)
.await;
assert_eq!(resp.status(), 413);
universe.assert_quit().await;
}

#[tokio::test]
async fn test_ingest_api_blocks_when_wait_is_specified() {
let (universe, _temp_dir, ingest_service_client, ingest_service_mailbox) =
setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::mock().into();
let ingest_api_handlers = ingest_api_handlers(ingest_router, ingest_service_client, IngestApiConfig::default());
let ingest_api_handlers = ingest_api_handlers(
ingest_router,
ingest_service_client,
IngestApiConfig::default(),
);
let handle = tokio::spawn(async move {
let resp = warp::test::request()
.path("/my-index/ingest?commit=wait_for")
Expand Down Expand Up @@ -394,7 +427,11 @@ pub(crate) mod tests {
let (universe, _temp_dir, ingest_service_client, ingest_service_mailbox) =
setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::mock().into();
let ingest_api_handlers = ingest_api_handlers(ingest_router, ingest_service_client, IngestApiConfig::default());
let ingest_api_handlers = ingest_api_handlers(
ingest_router,
ingest_service_client,
IngestApiConfig::default(),
);
let handle = tokio::spawn(async move {
let resp = warp::test::request()
.path("/my-index/ingest?commit=force")
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-serve/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub(crate) async fn start_rest_server(
.or(ingest_api_handlers(
ingest_router,
ingest_service.clone(),
quickwit_services.node_config.ingest_api_config.clone()
quickwit_services.node_config.ingest_api_config.clone(),
))
.or(index_management_handlers(
quickwit_services.index_manager.clone(),
Expand Down

0 comments on commit 4bfb898

Please sign in to comment.