diff --git a/server/src/event/format.rs b/server/src/event/format.rs index 35724bbd7..e637eb4e6 100644 --- a/server/src/event/format.rs +++ b/server/src/event/format.rs @@ -102,7 +102,7 @@ pub trait EventFormat: Sized { if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) { return Err(anyhow!("Schema mismatch")); } - new_schema = update_field_type_in_schema(new_schema, time_partition); + new_schema = update_field_type_in_schema(new_schema, None, time_partition, None); let rb = Self::decode(data, new_schema.clone())?; let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows())); let metadata_arr = @@ -147,19 +147,101 @@ pub trait EventFormat: Sized { } } +pub fn get_existing_fields( + inferred_schema: Arc, + existing_schema: Option<&HashMap>>, +) -> Vec> { + let mut existing_fields = Vec::new(); + + for field in inferred_schema.fields.iter() { + if existing_schema.map_or(false, |schema| schema.contains_key(field.name())) { + existing_fields.push(field.clone()); + } + } + + existing_fields +} + +pub fn get_existing_timestamp_fields( + existing_schema: &HashMap>, +) -> Vec> { + let mut timestamp_fields = Vec::new(); + + for field in existing_schema.values() { + if let DataType::Timestamp(TimeUnit::Millisecond, None) = field.data_type() { + timestamp_fields.push(field.clone()); + } + } + + timestamp_fields +} + +pub fn override_timestamp_fields( + inferred_schema: Arc, + existing_timestamp_fields: &[Arc], +) -> Arc { + let timestamp_field_names: Vec<&str> = existing_timestamp_fields + .iter() + .map(|field| field.name().as_str()) + .collect(); + + let updated_fields: Vec> = inferred_schema + .fields() + .iter() + .map(|field| { + if timestamp_field_names.contains(&field.name().as_str()) { + Arc::new(Field::new( + field.name(), + DataType::Timestamp(TimeUnit::Millisecond, None), + field.is_nullable(), + )) + } else { + field.clone() + } + }) + .collect(); + + Arc::new(Schema::new(updated_fields)) +} + pub fn update_field_type_in_schema( - schema: Arc, + inferred_schema: Arc, + existing_schema: Option<&HashMap>>, time_partition: Option, + log_records: Option<&Vec>, ) -> Arc { + let mut updated_schema = inferred_schema.clone(); + + if let Some(existing_schema) = existing_schema { + let existing_fields = get_existing_fields(inferred_schema.clone(), Some(existing_schema)); + let existing_timestamp_fields = get_existing_timestamp_fields(existing_schema); + // overriding known timestamp fields which were inferred as string fields + updated_schema = override_timestamp_fields(updated_schema, &existing_timestamp_fields); + let existing_field_names: Vec = existing_fields + .iter() + .map(|field| field.name().clone()) + .collect(); + + if let Some(log_records) = log_records { + for log_record in log_records { + updated_schema = Arc::new(update_data_type_to_datetime( + (*updated_schema).clone(), + log_record.clone(), + existing_field_names.clone(), + )); + } + } + } + if time_partition.is_none() { - return schema; + return updated_schema; } - let field_name = time_partition.unwrap(); - let new_schema: Vec = schema + let time_partition_field_name = time_partition.unwrap(); + let new_schema: Vec = updated_schema .fields() .iter() .map(|field| { - if *field.name() == field_name { + if *field.name() == time_partition_field_name { if field.data_type() == &DataType::Utf8 { let new_data_type = DataType::Timestamp(TimeUnit::Millisecond, None); Field::new(field.name().clone(), new_data_type, true) @@ -174,12 +256,16 @@ pub fn update_field_type_in_schema( Arc::new(Schema::new(new_schema)) } -pub fn update_data_type_to_datetime(schema: Schema, value: Value) -> Schema { +pub fn update_data_type_to_datetime( + schema: Schema, + value: Value, + ignore_field_names: Vec, +) -> Schema { let new_schema: Vec = schema .fields() .iter() .map(|field| { - if field.data_type() == &DataType::Utf8 { + if field.data_type() == &DataType::Utf8 && !ignore_field_names.contains(field.name()) { if let Value::Object(map) = &value { if let Some(Value::String(s)) = map.get(field.name()) { if DateTime::parse_from_rfc3339(s).is_ok() { diff --git a/server/src/event/format/json.rs b/server/src/event/format/json.rs index 82cd9e3aa..cedbabaa3 100644 --- a/server/src/event/format/json.rs +++ b/server/src/event/format/json.rs @@ -71,7 +71,9 @@ impl EventFormat for Event { Ok(mut infer_schema) => { let new_infer_schema = super::super::format::update_field_type_in_schema( Arc::new(infer_schema), + Some(&stream_schema), time_partition, + Some(&value_arr), ); infer_schema = Schema::new(new_infer_schema.fields().clone()); if let Err(err) = Schema::try_merge(vec![ diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index fd2280470..2dd34d4dc 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -93,7 +93,7 @@ pub async fn list(_: HttpRequest) -> impl Responder { pub async fn detect_schema(body: Bytes) -> Result { let body_val: Value = serde_json::from_slice(&body)?; - let value_arr: Vec = match body_val { + let log_records: Vec = match body_val { Value::Array(arr) => arr, value @ Value::Object(_) => vec![value], _ => { @@ -104,9 +104,9 @@ pub async fn detect_schema(body: Bytes) -> Result { } }; - let mut schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok)).unwrap(); - for value in value_arr { - schema = update_data_type_to_datetime(schema, value); + let mut schema = infer_json_schema_from_iterator(log_records.iter().map(Ok)).unwrap(); + for log_record in log_records { + schema = update_data_type_to_datetime(schema, log_record, Vec::new()); } Ok((web::Json(schema), StatusCode::OK)) }