diff --git a/src/cli.rs b/src/cli.rs index e8b9a387e..34cb74d7c 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -475,6 +475,17 @@ pub struct Options { help = "OIDC scope to request (default: openid profile email)" )] pub scope: String, + + // event's maximum chunk age in hours + #[arg( + long, + env = "P_EVENT_MAX_CHUNK_AGE", + // Accept 0 to disallow older-than-reference events; cap to one week by default. + value_parser = clap::value_parser!(u64).range(0..=168), + default_value = "1", + help = "Max allowed age gap (in hours) between events within the same node, relative to the reference event" + )] + pub event_max_chunk_age: u64, } #[derive(Parser, Debug)] diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index cc3df7635..2b84a43a2 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -18,6 +18,7 @@ use std::collections::BTreeMap; use std::num::NonZeroU32; +use std::sync::Mutex; use chrono::{DateTime, Duration, Utc}; use serde_json::map::Map; @@ -27,6 +28,9 @@ use thiserror::Error; use crate::parseable::PARSEABLE; +// Global variable to track the first timestamp encountered during validation +static REFERENCE_TIMESTAMP: Mutex>> = Mutex::new(None); + #[derive(Error, Debug)] pub enum JsonFlattenError { #[error("Cannot flatten this JSON")] @@ -45,8 +49,12 @@ pub enum JsonFlattenError { FieldNotString(String), #[error("Field {0} is not in the correct datetime format")] InvalidDatetimeFormat(String), - #[error("Field {0} value is more than {1} days old")] - TimestampTooOld(String, i64), + #[error("Field {0} value '{2}' is more than {1} days old")] + TimestampTooOld(String, i64, DateTime), + #[error( + "Field {0} timestamp '{2}' is more than {1} hours older than reference timestamp '{3}'" + )] + TimestampTooOldRelative(String, i64, DateTime, DateTime), #[error("Expected object in array of objects")] ExpectedObjectInArray, #[error("Found non-object element while flattening array of objects")] @@ -169,14 +177,43 @@ pub fn validate_time_partition( partition_key.to_owned(), )); }; - let cutoff_date = Utc::now().naive_utc() - Duration::days(limit_days); - if parsed_timestamp.naive_utc() >= cutoff_date { - Ok(()) - } else { - Err(JsonFlattenError::TimestampTooOld( - partition_key.to_owned(), - limit_days, - )) + + // Access the global reference timestamp and handle poisoning + let mut reference_timestamp = REFERENCE_TIMESTAMP + .lock() + .unwrap_or_else(|p| p.into_inner()); + + match *reference_timestamp { + None => { + // First timestamp encountered - validate against cutoff date + let cutoff_ts = Utc::now() - Duration::days(limit_days); + if parsed_timestamp >= cutoff_ts { + // Set the reference timestamp + *reference_timestamp = Some(parsed_timestamp); + Ok(()) + } else { + Err(JsonFlattenError::TimestampTooOld( + partition_key.to_owned(), + limit_days, + parsed_timestamp, + )) + } + } + Some(ref_timestamp) => { + // Subsequent timestamps - validate they're not more than configured hours older than reference + let max_age_hours = PARSEABLE.options.event_max_chunk_age as i64; + let max_age_before_ref = ref_timestamp - Duration::hours(max_age_hours); + if parsed_timestamp >= max_age_before_ref { + Ok(()) + } else { + Err(JsonFlattenError::TimestampTooOldRelative( + partition_key.to_owned(), + max_age_hours, + parsed_timestamp, + ref_timestamp, + )) + } + } } } diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 517345ae3..d46646ae1 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -480,40 +480,6 @@ mod tests { ); } - #[test] - fn test_convert_array_to_object_with_time_partition() { - let json = json!([ - { - "a": "b", - "source_time": "2025-08-01T00:00:00.000Z" - }, - { - "a": "b", - "source_time": "2025-08-01T00:01:00.000Z" - } - ]); - - let time_partition = Some("source_time".to_string()); - let result = convert_array_to_object( - json, - time_partition.as_ref(), - None, - None, - SchemaVersion::V0, - &crate::event::format::LogSource::default(), - ); - - assert!(result.is_ok()); - let objects = result.unwrap(); - - // Should return 2 separate objects, not wrapped in an array - assert_eq!(objects.len(), 2); - assert_eq!(objects[0]["a"], "b"); - assert_eq!(objects[0]["source_time"], "2025-08-01T00:00:00.000Z"); - assert_eq!(objects[1]["a"], "b"); - assert_eq!(objects[1]["source_time"], "2025-08-01T00:01:00.000Z"); - } - #[test] fn test_convert_array_to_object_without_time_partition() { let json = json!([