Skip to content

Commit

Permalink
chore: add more content type support for log inserter api
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Jun 5, 2024
1 parent 38ed6bb commit 62fbe31
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use pipeline::error::{CastTypeSnafu, ExecPipelineSnafu};
use pipeline::Value as PipelineValue;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_json::{Deserializer, Value};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};

Expand All @@ -37,6 +37,7 @@ use crate::query_handler::LogHandlerRef;

#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct LogIngesterQueryParams {
pub table: Option<String>,
pub table_name: Option<String>,
pub db: Option<String>,
pub pipeline_name: Option<String>,
Expand Down Expand Up @@ -79,14 +80,28 @@ pub async fn log_ingester(
let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
reason: "pipeline_name is required",
})?;
let table_name = query_params.table_name.context(InvalidParameterSnafu {
reason: "table_name is required",
})?;
let table_name =
query_params
.table
.or(query_params.table_name)
.context(InvalidParameterSnafu {
reason: "table is required",
})?;

let m: mime::Mime = content_type.clone().into();
let value = match m.subtype() {
// TODO (qtang): we should decide json or jsonl
mime::JSON => serde_json::from_str(&payload).context(ParseJsonSnafu)?,
mime::JSON => {
let mut value = Deserializer::from_str(&payload)
.into_iter::<Value>()
.collect::<std::result::Result<Value, _>>()
.context(ParseJsonSnafu)?;
// check if the payload is an ndjson array
if value.is_array() && value.as_array().unwrap().len() == 1 && value[0].is_array() {
value[0].take()
} else {
value
}
}
// add more content type support
_ => UnsupportedContentTypeSnafu { content_type }.fail()?,
};
Expand Down

0 comments on commit 62fbe31

Please sign in to comment.