From 10ec3bafb4d713847e7210c234598304d24de103 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 24 Feb 2025 18:55:52 +0530 Subject: [PATCH 1/7] refactor: json events --- src/connectors/kafka/processor.rs | 36 ++++------ src/event/format/json.rs | 4 +- src/handlers/http/ingest.rs | 68 +++++++++++-------- src/handlers/http/modal/utils/ingest_utils.rs | 33 ++------- 4 files changed, 62 insertions(+), 79 deletions(-) diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 191396660..491f7c07d 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -61,12 +61,20 @@ impl ParseableSinkProcessor { let static_schema_flag = stream.get_static_schema_flag(); let schema_version = stream.get_schema_version(); - let (json_vec, total_payload_size) = Self::json_vec(records); - let batch_json_event = json::Event { - data: Value::Array(json_vec), - }; + let mut json_vec = Vec::with_capacity(records.len()); + let mut total_payload_size = 0u64; + + for record in records.iter().filter_map(|r| r.payload.as_ref()) { + total_payload_size += record.len() as u64; + if let Ok(value) = serde_json::from_slice::(record) { + json_vec.push(value); + } + } - let (rb, is_first) = batch_json_event.into_recordbatch( + let (rb, is_first) = json::Event { + json: Value::Array(json_vec), + } + .into_recordbatch( &schema, static_schema_flag, time_partition.as_ref(), @@ -87,31 +95,17 @@ impl ParseableSinkProcessor { Ok(p_event) } - - fn json_vec(records: &[ConsumerRecord]) -> (Vec, u64) { - let mut json_vec = Vec::with_capacity(records.len()); - let mut total_payload_size = 0u64; - - for record in records.iter().filter_map(|r| r.payload.as_ref()) { - total_payload_size += record.len() as u64; - if let Ok(value) = serde_json::from_slice::(record) { - json_vec.push(value); - } - } - - (json_vec, total_payload_size) - } } #[async_trait] impl Processor, ()> for ParseableSinkProcessor { async fn process(&self, records: Vec) -> anyhow::Result<()> { let len = records.len(); - debug!("Processing {} records", len); + debug!("Processing {len} records"); self.build_event_from_chunk(&records).await?.process()?; - debug!("Processed {} records", len); + debug!("Processed {len} records"); Ok(()) } } diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 5006be142..369b62cfe 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -33,7 +33,7 @@ use super::EventFormat; use crate::{metadata::SchemaVersion, utils::arrow::get_field}; pub struct Event { - pub data: Value, + pub json: Value, } impl EventFormat for Event { @@ -52,7 +52,7 @@ impl EventFormat for Event { // incoming event may be a single json or a json array // but Data (type defined above) is a vector of json values // hence we need to convert the incoming event to a vector of json values - let value_arr = match self.data { + let value_arr = match self.json { Value::Array(arr) => arr, value @ Value::Object(_) => vec![value], _ => unreachable!("flatten would have failed beforehand"), diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 6706470a2..e52edbf70 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -81,7 +81,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< let size: usize = body.len(); let parsed_timestamp = Utc::now().naive_utc(); let (rb, is_first) = { - let body_val: Value = serde_json::from_slice(&body)?; + let json: Value = serde_json::from_slice(&body)?; let hash_map = PARSEABLE.streams.read().unwrap(); let schema = hash_map .get(&stream_name) @@ -91,7 +91,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< .expect(LOCK_EXPECT) .schema .clone(); - let event = format::json::Event { data: body_val }; + let event = format::json::Event { json }; // For internal streams, use old schema event.into_recordbatch(&schema, false, None, SchemaVersion::V0)? }; @@ -355,7 +355,7 @@ mod tests { use std::{collections::HashMap, sync::Arc}; use crate::{ - handlers::http::modal::utils::ingest_utils::into_event_batch, + event::format::{json, EventFormat}, metadata::SchemaVersion, utils::json::{convert_array_to_object, flatten::convert_to_array}, }; @@ -392,8 +392,9 @@ mod tests { "b": "hello", }); - let (rb, _) = - into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = json::Event { json } + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 4); @@ -419,8 +420,9 @@ mod tests { "c": null }); - let (rb, _) = - into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = json::Event { json } + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -450,7 +452,9 @@ mod tests { .into_iter(), ); - let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = json::Event { json } + .into_recordbatch(&schema, false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -480,7 +484,9 @@ mod tests { .into_iter(), ); - assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err()); + assert!(json::Event { json } + .into_recordbatch(&schema, false, None, SchemaVersion::V0,) + .is_err()); } #[test] @@ -496,7 +502,9 @@ mod tests { .into_iter(), ); - let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = json::Event { json } + .into_recordbatch(&schema, false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 1); @@ -535,8 +543,9 @@ mod tests { }, ]); - let (rb, _) = - into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = json::Event { json } + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 4); @@ -582,8 +591,9 @@ mod tests { }, ]); - let (rb, _) = - into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = json::Event { json } + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 4); @@ -630,7 +640,9 @@ mod tests { .into_iter(), ); - let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = json::Event { json } + .into_recordbatch(&schema, false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 4); @@ -677,7 +689,9 @@ mod tests { .into_iter(), ); - assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err()); + assert!(json::Event { json } + .into_recordbatch(&schema, false, None, SchemaVersion::V0,) + .is_err()); } #[test] @@ -715,13 +729,10 @@ mod tests { ) .unwrap(); - let (rb, _) = into_event_batch( - flattened_json, - HashMap::default(), - false, - None, - SchemaVersion::V0, - ) + let (rb, _) = json::Event { + json: flattened_json, + } + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) .unwrap(); assert_eq!(rb.num_rows(), 4); assert_eq!(rb.num_columns(), 5); @@ -803,13 +814,10 @@ mod tests { ) .unwrap(); - let (rb, _) = into_event_batch( - flattened_json, - HashMap::default(), - false, - None, - SchemaVersion::V1, - ) + let (rb, _) = json::Event { + json: flattened_json, + } + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V1) .unwrap(); assert_eq!(rb.num_rows(), 4); diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 005b38a91..db804a311 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -16,14 +16,13 @@ * */ -use arrow_schema::Field; use chrono::{DateTime, NaiveDateTime, Utc}; use itertools::Itertools; use opentelemetry_proto::tonic::{ logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData, }; use serde_json::Value; -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; use crate::{ event::{ @@ -34,7 +33,6 @@ use crate::{ ingest::PostError, kinesis::{flatten_kinesis_logs, Message}, }, - metadata::SchemaVersion, otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, parseable::{StreamNotFound, PARSEABLE}, storage::StreamType, @@ -117,16 +115,16 @@ async fn push_logs( )?)?] }; - for value in data { - let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length + for json in data { + let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length let parsed_timestamp = match time_partition.as_ref() { - Some(time_partition) => get_parsed_timestamp(&value, time_partition)?, + Some(time_partition) => get_parsed_timestamp(&json, time_partition)?, _ => Utc::now().naive_utc(), }; let custom_partition_values = match custom_partition.as_ref() { Some(custom_partition) => { let custom_partitions = custom_partition.split(',').collect_vec(); - get_custom_partition_values(&value, &custom_partitions) + get_custom_partition_values(&json, &custom_partitions) } None => HashMap::new(), }; @@ -141,9 +139,8 @@ async fn push_logs( .expect(LOCK_EXPECT) .schema .clone(); - let (rb, is_first_event) = into_event_batch( - value, - schema, + let (rb, is_first_event) = json::Event { json }.into_recordbatch( + &schema, static_schema_flag, time_partition.as_ref(), schema_version, @@ -165,22 +162,6 @@ async fn push_logs( Ok(()) } -pub fn into_event_batch( - data: Value, - schema: HashMap>, - static_schema_flag: bool, - time_partition: Option<&String>, - schema_version: SchemaVersion, -) -> Result<(arrow_array::RecordBatch, bool), PostError> { - let (rb, is_first) = json::Event { data }.into_recordbatch( - &schema, - static_schema_flag, - time_partition, - schema_version, - )?; - Ok((rb, is_first)) -} - pub fn get_custom_partition_values( json: &Value, custom_partition_list: &[&str], From aa37f9cd1d950ce6bbd1f2521b01c1db42e44336 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Feb 2025 00:11:13 +0530 Subject: [PATCH 2/7] refactor: `into_event` brings conversion closer --- src/connectors/kafka/processor.rs | 25 +--- src/event/format/json.rs | 123 ++++++++++++++++- src/event/format/mod.rs | 15 +- src/handlers/http/ingest.rs | 76 ++++------- src/handlers/http/modal/utils/ingest_utils.rs | 128 ++---------------- 5 files changed, 188 insertions(+), 179 deletions(-) diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 491f7c07d..b74754003 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -16,10 +16,9 @@ * */ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use async_trait::async_trait; -use chrono::Utc; use futures_util::StreamExt; use rdkafka::consumer::{CommitMode, Consumer}; use serde_json::Value; @@ -58,6 +57,7 @@ impl ParseableSinkProcessor { let stream = PARSEABLE.get_stream(stream_name)?; let schema = stream.get_schema_raw(); let time_partition = stream.get_time_partition(); + let custom_partition = stream.get_custom_partition(); let static_schema_flag = stream.get_static_schema_flag(); let schema_version = stream.get_schema_version(); @@ -71,28 +71,17 @@ impl ParseableSinkProcessor { } } - let (rb, is_first) = json::Event { - json: Value::Array(json_vec), - } - .into_recordbatch( + let p_event = json::Event::new(Value::Array(json_vec)).into_event( + stream_name.to_string(), + total_payload_size, &schema, static_schema_flag, + custom_partition.as_ref(), time_partition.as_ref(), schema_version, + StreamType::UserDefined, )?; - let p_event = ParseableEvent { - rb, - stream_name: stream_name.to_string(), - origin_format: "json", - origin_size: total_payload_size, - is_first_event: is_first, - parsed_timestamp: Utc::now().naive_utc(), - time_partition: None, - custom_partition_values: HashMap::new(), - stream_type: StreamType::UserDefined, - }; - Ok(p_event) } } diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 369b62cfe..104ac999d 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -23,6 +23,7 @@ use anyhow::anyhow; use arrow_array::RecordBatch; use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder}; use arrow_schema::{DataType, Field, Fields, Schema}; +use chrono::{DateTime, NaiveDateTime, Utc}; use datafusion::arrow::util::bit_util::round_upto_multiple_of_64; use itertools::Itertools; use serde_json::Value; @@ -30,10 +31,20 @@ use std::{collections::HashMap, sync::Arc}; use tracing::error; use super::EventFormat; -use crate::{metadata::SchemaVersion, utils::arrow::get_field}; +use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field}; pub struct Event { pub json: Value, + ingestion_time: DateTime, +} + +impl Event { + pub fn new(json: Value) -> Self { + Self { + json, + ingestion_time: Utc::now(), + } + } } impl EventFormat for Event { @@ -120,6 +131,82 @@ impl EventFormat for Event { Ok(None) => unreachable!("all records are added to one rb"), } } + + fn into_event( + self, + stream_name: String, + origin_size: u64, + storage_schema: &HashMap>, + static_schema_flag: bool, + custom_partitions: Option<&String>, + time_partition: Option<&String>, + schema_version: SchemaVersion, + stream_type: StreamType, + ) -> Result { + let custom_partition_values = match custom_partitions.as_ref() { + Some(custom_partition) => { + let custom_partitions = custom_partition.split(',').collect_vec(); + get_custom_partition_values(&self.json, &custom_partitions) + } + None => HashMap::new(), + }; + + let parsed_timestamp = match time_partition { + Some(time_partition) => get_parsed_timestamp(&self.json, time_partition)?, + _ => self.ingestion_time.naive_utc(), + }; + + let (rb, is_first_event) = self.into_recordbatch( + storage_schema, + static_schema_flag, + time_partition, + schema_version, + )?; + + Ok(super::Event { + rb, + stream_name, + origin_format: "json", + origin_size, + is_first_event, + parsed_timestamp, + time_partition: None, + custom_partition_values, + stream_type, + }) + } +} + +pub fn get_custom_partition_values( + json: &Value, + custom_partition_list: &[&str], +) -> HashMap { + let mut custom_partition_values: HashMap = HashMap::new(); + for custom_partition_field in custom_partition_list { + let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned(); + let custom_partition_value = match custom_partition_value { + e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), + Value::String(s) => s, + _ => "".to_string(), + }; + custom_partition_values.insert( + custom_partition_field.trim().to_string(), + custom_partition_value, + ); + } + custom_partition_values +} + +fn get_parsed_timestamp( + json: &Value, + time_partition: &str, +) -> Result { + let current_time = json + .get(time_partition) + .ok_or_else(|| anyhow!("Missing field for time partition in json: {time_partition}"))?; + let parsed_time: DateTime = serde_json::from_value(current_time.clone())?; + + Ok(parsed_time.naive_utc()) } // Returns arrow schema with the fields that are present in the request body @@ -225,3 +312,37 @@ fn valid_type(data_type: &DataType, value: &Value, schema_version: SchemaVersion } } } + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use serde_json::json; + + use super::*; + + #[test] + fn parse_time_parition_from_value() { + let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); + let parsed = get_parsed_timestamp(&json, "timestamp"); + + let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap(); + assert_eq!(parsed.unwrap(), expected); + } + + #[test] + fn time_parition_not_in_json() { + let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); + let parsed = get_parsed_timestamp(&json, "timestamp"); + + assert!(parsed.is_err()); + } + + #[test] + fn time_parition_not_parseable_as_datetime() { + let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); + let parsed = get_parsed_timestamp(&json, "timestamp"); + + assert!(parsed.is_err()); + } +} diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index c0a2ec323..b0397bf35 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -32,10 +32,11 @@ use serde_json::Value; use crate::{ metadata::SchemaVersion, + storage::StreamType, utils::arrow::{get_field, get_timestamp_array, replace_columns}, }; -use super::DEFAULT_TIMESTAMP_KEY; +use super::{Event, DEFAULT_TIMESTAMP_KEY}; pub mod json; @@ -172,6 +173,18 @@ pub trait EventFormat: Sized { } true } + + fn into_event( + self, + stream_name: String, + origin_size: u64, + storage_schema: &HashMap>, + static_schema_flag: bool, + custom_partitions: Option<&String>, + time_partition: Option<&String>, + schema_version: SchemaVersion, + stream_type: StreamType, + ) -> Result; } pub fn get_existing_field_names( diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index e52edbf70..1cfef1e89 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -26,6 +26,7 @@ use chrono::Utc; use http::StatusCode; use serde_json::Value; +use crate::event; use crate::event::error::EventError; use crate::event::format::{self, EventFormat, LogSource}; use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; @@ -35,7 +36,6 @@ use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use crate::utils::json::flatten::JsonFlattenError; -use crate::{event, LOCK_EXPECT}; use super::logstream::error::{CreateStreamError, StreamError}; use super::modal::utils::ingest_utils::flatten_and_push_logs; @@ -79,34 +79,22 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result Result<(), PostError> { let size: usize = body.len(); - let parsed_timestamp = Utc::now().naive_utc(); - let (rb, is_first) = { - let json: Value = serde_json::from_slice(&body)?; - let hash_map = PARSEABLE.streams.read().unwrap(); - let schema = hash_map - .get(&stream_name) - .ok_or_else(|| StreamNotFound(stream_name.clone()))? - .metadata - .read() - .expect(LOCK_EXPECT) - .schema - .clone(); - let event = format::json::Event { json }; - // For internal streams, use old schema - event.into_recordbatch(&schema, false, None, SchemaVersion::V0)? - }; - event::Event { - rb, - stream_name, - origin_format: "json", - origin_size: size as u64, - is_first_event: is_first, - parsed_timestamp, - time_partition: None, - custom_partition_values: HashMap::new(), - stream_type: StreamType::Internal, - } - .process()?; + let json: Value = serde_json::from_slice(&body)?; + let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw(); + + // For internal streams, use old schema + format::json::Event::new(json) + .into_event( + stream_name, + size as u64, + &schema, + false, + None, + None, + SchemaVersion::V0, + StreamType::Internal, + )? + .process()?; Ok(()) } @@ -392,7 +380,7 @@ mod tests { "b": "hello", }); - let (rb, _) = json::Event { json } + let (rb, _) = json::Event::new(json) .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) .unwrap(); @@ -420,7 +408,7 @@ mod tests { "c": null }); - let (rb, _) = json::Event { json } + let (rb, _) = json::Event::new(json) .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) .unwrap(); @@ -452,7 +440,7 @@ mod tests { .into_iter(), ); - let (rb, _) = json::Event { json } + let (rb, _) = json::Event::new(json) .into_recordbatch(&schema, false, None, SchemaVersion::V0) .unwrap(); @@ -484,7 +472,7 @@ mod tests { .into_iter(), ); - assert!(json::Event { json } + assert!(json::Event::new(json) .into_recordbatch(&schema, false, None, SchemaVersion::V0,) .is_err()); } @@ -502,7 +490,7 @@ mod tests { .into_iter(), ); - let (rb, _) = json::Event { json } + let (rb, _) = json::Event::new(json) .into_recordbatch(&schema, false, None, SchemaVersion::V0) .unwrap(); @@ -543,7 +531,7 @@ mod tests { }, ]); - let (rb, _) = json::Event { json } + let (rb, _) = json::Event::new(json) .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) .unwrap(); @@ -591,7 +579,7 @@ mod tests { }, ]); - let (rb, _) = json::Event { json } + let (rb, _) = json::Event::new(json) .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) .unwrap(); @@ -640,7 +628,7 @@ mod tests { .into_iter(), ); - let (rb, _) = json::Event { json } + let (rb, _) = json::Event::new(json) .into_recordbatch(&schema, false, None, SchemaVersion::V0) .unwrap(); @@ -689,7 +677,7 @@ mod tests { .into_iter(), ); - assert!(json::Event { json } + assert!(json::Event::new(json) .into_recordbatch(&schema, false, None, SchemaVersion::V0,) .is_err()); } @@ -729,11 +717,9 @@ mod tests { ) .unwrap(); - let (rb, _) = json::Event { - json: flattened_json, - } - .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) - .unwrap(); + let (rb, _) = json::Event::new(flattened_json) + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 4); assert_eq!(rb.num_columns(), 5); assert_eq!( @@ -814,9 +800,7 @@ mod tests { ) .unwrap(); - let (rb, _) = json::Event { - json: flattened_json, - } + let (rb, _) = json::Event::new(flattened_json) .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V1) .unwrap(); diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index db804a311..d1e95948e 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -16,28 +16,21 @@ * */ -use chrono::{DateTime, NaiveDateTime, Utc}; -use itertools::Itertools; use opentelemetry_proto::tonic::{ logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData, }; use serde_json::Value; -use std::collections::HashMap; use crate::{ - event::{ - format::{json, EventFormat, LogSource}, - Event, - }, + event::format::{json, EventFormat, LogSource}, handlers::http::{ ingest::PostError, kinesis::{flatten_kinesis_logs, Message}, }, otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, - parseable::{StreamNotFound, PARSEABLE}, + parseable::PARSEABLE, storage::StreamType, utils::json::{convert_array_to_object, flatten::convert_to_array}, - LOCK_EXPECT, }; pub async fn flatten_and_push_logs( @@ -117,110 +110,19 @@ async fn push_logs( for json in data { let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length - let parsed_timestamp = match time_partition.as_ref() { - Some(time_partition) => get_parsed_timestamp(&json, time_partition)?, - _ => Utc::now().naive_utc(), - }; - let custom_partition_values = match custom_partition.as_ref() { - Some(custom_partition) => { - let custom_partitions = custom_partition.split(',').collect_vec(); - get_custom_partition_values(&json, &custom_partitions) - } - None => HashMap::new(), - }; - let schema = PARSEABLE - .streams - .read() - .unwrap() - .get(stream_name) - .ok_or_else(|| StreamNotFound(stream_name.to_owned()))? - .metadata - .read() - .expect(LOCK_EXPECT) - .schema - .clone(); - let (rb, is_first_event) = json::Event { json }.into_recordbatch( - &schema, - static_schema_flag, - time_partition.as_ref(), - schema_version, - )?; - - Event { - rb, - stream_name: stream_name.to_owned(), - origin_format: "json", - origin_size, - is_first_event, - parsed_timestamp, - time_partition: time_partition.clone(), - custom_partition_values, - stream_type: StreamType::UserDefined, - } - .process()?; + let schema = PARSEABLE.get_stream(stream_name)?.get_schema_raw(); + json::Event::new(json) + .into_event( + stream_name.to_owned(), + origin_size, + &schema, + static_schema_flag, + custom_partition.as_ref(), + time_partition.as_ref(), + schema_version, + StreamType::UserDefined, + )? + .process()?; } Ok(()) } - -pub fn get_custom_partition_values( - json: &Value, - custom_partition_list: &[&str], -) -> HashMap { - let mut custom_partition_values: HashMap = HashMap::new(); - for custom_partition_field in custom_partition_list { - let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned(); - let custom_partition_value = match custom_partition_value { - e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), - Value::String(s) => s, - _ => "".to_string(), - }; - custom_partition_values.insert( - custom_partition_field.trim().to_string(), - custom_partition_value, - ); - } - custom_partition_values -} - -fn get_parsed_timestamp(json: &Value, time_partition: &str) -> Result { - let current_time = json - .get(time_partition) - .ok_or_else(|| PostError::MissingTimePartition(time_partition.to_string()))?; - let parsed_time: DateTime = serde_json::from_value(current_time.clone())?; - - Ok(parsed_time.naive_utc()) -} - -#[cfg(test)] -mod tests { - use std::str::FromStr; - - use serde_json::json; - - use super::*; - - #[test] - fn parse_time_parition_from_value() { - let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); - let parsed = get_parsed_timestamp(&json, "timestamp"); - - let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap(); - assert_eq!(parsed.unwrap(), expected); - } - - #[test] - fn time_parition_not_in_json() { - let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); - let parsed = get_parsed_timestamp(&json, "timestamp"); - - matches!(parsed, Err(PostError::MissingTimePartition(_))); - } - - #[test] - fn time_parition_not_parseable_as_datetime() { - let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); - let parsed = get_parsed_timestamp(&json, "timestamp"); - - matches!(parsed, Err(PostError::SerdeError(_))); - } -} From 6a19a1e9ba51598c79cee99751c530094657c0e0 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Feb 2025 00:18:14 +0530 Subject: [PATCH 3/7] doc: into_event --- src/event/format/json.rs | 1 + src/event/format/mod.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 104ac999d..acc9166fe 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -132,6 +132,7 @@ impl EventFormat for Event { } } + /// Converts a JSON event into a Parseable Event fn into_event( self, stream_name: String, diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index b0397bf35..f9b5adc8a 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -174,6 +174,7 @@ pub trait EventFormat: Sized { true } + #[allow(clippy::too_many_arguments)] fn into_event( self, stream_name: String, From ac08646e9909e58d8e3fb8b5d6c134f80564cb7f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 25 Feb 2025 00:27:05 +0530 Subject: [PATCH 4/7] test: fix broken tests --- src/event/format/json.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index acc9166fe..94c88caa6 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -333,7 +333,7 @@ mod tests { #[test] fn time_parition_not_in_json() { - let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); + let json = json!({"hello": "world!"}); let parsed = get_parsed_timestamp(&json, "timestamp"); assert!(parsed.is_err()); @@ -341,7 +341,7 @@ mod tests { #[test] fn time_parition_not_parseable_as_datetime() { - let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); + let json = json!({"timestamp": "not time"}); let parsed = get_parsed_timestamp(&json, "timestamp"); assert!(parsed.is_err()); From 71ac640084d9030bf11ef65cd7efc6d36fe4bb86 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 27 Feb 2025 20:09:59 +0530 Subject: [PATCH 5/7] doc: `get_p_timestamp` Signed-off-by: Devdutt Shenoi --- src/event/format/json.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 8b72c94d0..b1066dcad 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -50,6 +50,7 @@ impl Event { impl EventFormat for Event { type Data = Vec; + /// Returns the time at ingestion, i.e. the `p_timestamp` value fn get_p_timestamp(&self) -> DateTime { self.p_timestamp } From 360bdd47e8edea1bb70e7d414a4baaef119c3f0b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 28 Feb 2025 11:21:22 +0530 Subject: [PATCH 6/7] refactor: restrict to `NaiveDate` --- src/event/mod.rs | 2 +- src/metadata.rs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/event/mod.rs b/src/event/mod.rs index cbf0a4a5a..29a4a0899 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -81,7 +81,7 @@ impl Event { self.origin_format, self.origin_size, self.rb.num_rows(), - self.parsed_timestamp, + self.parsed_timestamp.date(), ); crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb); diff --git a/src/metadata.rs b/src/metadata.rs index a29fdfee2..f4d2e2225 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -17,7 +17,7 @@ */ use arrow_schema::{DataType, Field, Schema, TimeUnit}; -use chrono::{Local, NaiveDateTime}; +use chrono::{Local, NaiveDate}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::num::NonZeroU32; @@ -37,20 +37,20 @@ pub fn update_stats( origin: &'static str, size: u64, num_rows: usize, - parsed_timestamp: NaiveDateTime, + parsed_date: NaiveDate, ) { - let parsed_date = parsed_timestamp.date().to_string(); + let parsed_date = parsed_date.to_string(); EVENTS_INGESTED .with_label_values(&[stream_name, origin]) .add(num_rows as i64); EVENTS_INGESTED_DATE - .with_label_values(&[stream_name, origin, parsed_date.as_str()]) + .with_label_values(&[stream_name, origin, &parsed_date]) .add(num_rows as i64); EVENTS_INGESTED_SIZE .with_label_values(&[stream_name, origin]) .add(size as i64); EVENTS_INGESTED_SIZE_DATE - .with_label_values(&[stream_name, origin, parsed_date.as_str()]) + .with_label_values(&[stream_name, origin, &parsed_date]) .add(size as i64); LIFETIME_EVENTS_INGESTED .with_label_values(&[stream_name, origin]) From 69a3b4d6fa3d120d747e871d37c66e55b615f289 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 28 Feb 2025 12:02:40 +0530 Subject: [PATCH 7/7] doc: extractors --- src/event/format/json.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index b1066dcad..c28b701de 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -152,13 +152,13 @@ impl EventFormat for Event { let custom_partition_values = match custom_partitions.as_ref() { Some(custom_partition) => { let custom_partitions = custom_partition.split(',').collect_vec(); - get_custom_partition_values(&self.json, &custom_partitions) + extract_custom_partition_values(&self.json, &custom_partitions) } None => HashMap::new(), }; let parsed_timestamp = match time_partition { - Some(time_partition) => get_parsed_timestamp(&self.json, time_partition)?, + Some(time_partition) => extract_and_parse_time(&self.json, time_partition)?, _ => self.p_timestamp.naive_utc(), }; @@ -183,7 +183,9 @@ impl EventFormat for Event { } } -pub fn get_custom_partition_values( +/// Extracts custom partition values from provided JSON object +/// e.g. `json: {"status": 400, "msg": "Hello, World!"}, custom_partition_list: ["status"]` returns `{"status" => 400}` +pub fn extract_custom_partition_values( json: &Value, custom_partition_list: &[&str], ) -> HashMap { @@ -203,7 +205,9 @@ pub fn get_custom_partition_values( custom_partition_values } -fn get_parsed_timestamp( +/// Returns the parsed timestamp of deignated time partition from json object +/// e.g. `json: {"timestamp": "2025-05-15T15:30:00Z"}` returns `2025-05-15T15:30:00` +fn extract_and_parse_time( json: &Value, time_partition: &str, ) -> Result { @@ -330,7 +334,7 @@ mod tests { #[test] fn parse_time_parition_from_value() { let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); - let parsed = get_parsed_timestamp(&json, "timestamp"); + let parsed = extract_and_parse_time(&json, "timestamp"); let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap(); assert_eq!(parsed.unwrap(), expected); @@ -339,7 +343,7 @@ mod tests { #[test] fn time_parition_not_in_json() { let json = json!({"hello": "world!"}); - let parsed = get_parsed_timestamp(&json, "timestamp"); + let parsed = extract_and_parse_time(&json, "timestamp"); assert!(parsed.is_err()); } @@ -347,7 +351,7 @@ mod tests { #[test] fn time_parition_not_parseable_as_datetime() { let json = json!({"timestamp": "not time"}); - let parsed = get_parsed_timestamp(&json, "timestamp"); + let parsed = extract_and_parse_time(&json, "timestamp"); assert!(parsed.is_err()); }