Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,17 @@ pub struct Options {
help = "OIDC scope to request (default: openid profile email)"
)]
pub scope: String,

// event's maximum chunk age in hours
#[arg(
long,
env = "P_EVENT_MAX_CHUNK_AGE",
// Accept 0 to disallow older-than-reference events; cap to one week by default.
value_parser = clap::value_parser!(u64).range(0..=168),
default_value = "1",
help = "Max allowed age gap (in hours) between events within the same node, relative to the reference event"
)]
pub event_max_chunk_age: u64,
}

#[derive(Parser, Debug)]
Expand Down
57 changes: 47 additions & 10 deletions src/utils/json/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use std::collections::BTreeMap;
use std::num::NonZeroU32;
use std::sync::Mutex;

use chrono::{DateTime, Duration, Utc};
use serde_json::map::Map;
Expand All @@ -27,6 +28,9 @@ use thiserror::Error;

use crate::parseable::PARSEABLE;

// Global variable to track the first timestamp encountered during validation
static REFERENCE_TIMESTAMP: Mutex<Option<DateTime<Utc>>> = Mutex::new(None);

#[derive(Error, Debug)]
pub enum JsonFlattenError {
#[error("Cannot flatten this JSON")]
Expand All @@ -45,8 +49,12 @@ pub enum JsonFlattenError {
FieldNotString(String),
#[error("Field {0} is not in the correct datetime format")]
InvalidDatetimeFormat(String),
#[error("Field {0} value is more than {1} days old")]
TimestampTooOld(String, i64),
#[error("Field {0} value '{2}' is more than {1} days old")]
TimestampTooOld(String, i64, DateTime<Utc>),
#[error(
"Field {0} timestamp '{2}' is more than {1} hours older than reference timestamp '{3}'"
)]
TimestampTooOldRelative(String, i64, DateTime<Utc>, DateTime<Utc>),
#[error("Expected object in array of objects")]
ExpectedObjectInArray,
#[error("Found non-object element while flattening array of objects")]
Expand Down Expand Up @@ -169,14 +177,43 @@ pub fn validate_time_partition(
partition_key.to_owned(),
));
};
let cutoff_date = Utc::now().naive_utc() - Duration::days(limit_days);
if parsed_timestamp.naive_utc() >= cutoff_date {
Ok(())
} else {
Err(JsonFlattenError::TimestampTooOld(
partition_key.to_owned(),
limit_days,
))

// Access the global reference timestamp and handle poisoning
let mut reference_timestamp = REFERENCE_TIMESTAMP
.lock()
.unwrap_or_else(|p| p.into_inner());

match *reference_timestamp {
None => {
// First timestamp encountered - validate against cutoff date
let cutoff_ts = Utc::now() - Duration::days(limit_days);
if parsed_timestamp >= cutoff_ts {
// Set the reference timestamp
*reference_timestamp = Some(parsed_timestamp);
Ok(())
} else {
Err(JsonFlattenError::TimestampTooOld(
partition_key.to_owned(),
limit_days,
parsed_timestamp,
))
}
}
Some(ref_timestamp) => {
// Subsequent timestamps - validate they're not more than configured hours older than reference
let max_age_hours = PARSEABLE.options.event_max_chunk_age as i64;
let max_age_before_ref = ref_timestamp - Duration::hours(max_age_hours);
if parsed_timestamp >= max_age_before_ref {
Ok(())
} else {
Err(JsonFlattenError::TimestampTooOldRelative(
partition_key.to_owned(),
max_age_hours,
parsed_timestamp,
ref_timestamp,
))
}
}
}
}

Expand Down
34 changes: 0 additions & 34 deletions src/utils/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,40 +480,6 @@ mod tests {
);
}

#[test]
fn test_convert_array_to_object_with_time_partition() {
let json = json!([
{
"a": "b",
"source_time": "2025-08-01T00:00:00.000Z"
},
{
"a": "b",
"source_time": "2025-08-01T00:01:00.000Z"
}
]);

let time_partition = Some("source_time".to_string());
let result = convert_array_to_object(
json,
time_partition.as_ref(),
None,
None,
SchemaVersion::V0,
&crate::event::format::LogSource::default(),
);

assert!(result.is_ok());
let objects = result.unwrap();

// Should return 2 separate objects, not wrapped in an array
assert_eq!(objects.len(), 2);
assert_eq!(objects[0]["a"], "b");
assert_eq!(objects[0]["source_time"], "2025-08-01T00:00:00.000Z");
assert_eq!(objects[1]["a"], "b");
assert_eq!(objects[1]["source_time"], "2025-08-01T00:01:00.000Z");
}

#[test]
fn test_convert_array_to_object_without_time_partition() {
let json = json!([
Expand Down
Loading