From 7ee906ee132511d2e1eb938e8b3d19413c629311 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 21 Aug 2025 03:31:21 -0700 Subject: [PATCH 1/4] optimise first and latest event fetch --- src/storage/azure_blob.rs | 47 ++++++ src/storage/gcs.rs | 47 ++++++ src/storage/localfs.rs | 37 +++++ src/storage/object_storage.rs | 284 +++++++++++++++------------------- src/storage/s3.rs | 47 ++++++ 5 files changed, 300 insertions(+), 162 deletions(-) diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 34933a98d..63feabe05 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -698,6 +698,53 @@ impl ObjectStorage for BlobStore { Ok(streams) } + async fn list_hours( + &self, + stream_name: &str, + date: &str, + ) -> Result, ObjectStorageError> { + let pre = object_store::path::Path::from(format!("{}/{}/", stream_name, date)); + let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + let hours = resp + .common_prefixes + .iter() + .filter_map(|path| { + path.as_ref() + .strip_prefix(&format!("{}/{}/", stream_name, date)) + .and_then(|s| s.strip_suffix('/')) + .map(String::from) + }) + .filter(|dir| dir.starts_with("hour=")) + .collect(); + + Ok(hours) + } + + async fn list_minutes( + &self, + stream_name: &str, + date: &str, + hour: &str, + ) -> Result, ObjectStorageError> { + let pre = object_store::path::Path::from(format!("{}/{}/{}/", stream_name, date, hour)); + let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + let minutes = resp + .common_prefixes + .iter() + .filter_map(|path| { + path.as_ref() + .strip_prefix(&format!("{}/{}/{}/", stream_name, date, hour)) + .and_then(|s| s.strip_suffix('/')) + .map(String::from) + }) + .filter(|dir| dir.starts_with("minute=")) + .collect(); + + Ok(minutes) + } + async fn list_manifest_files( &self, stream_name: &str, diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index 23bae4710..7912508f1 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -605,6 +605,53 @@ impl ObjectStorage for Gcs { Ok(streams) } + async fn list_hours( + &self, + stream_name: &str, + date: &str, + ) -> Result, ObjectStorageError> { + let pre = object_store::path::Path::from(format!("{}/{}/", stream_name, date)); + let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + let hours = resp + .common_prefixes + .iter() + .filter_map(|path| { + path.as_ref() + .strip_prefix(&format!("{}/{}/", stream_name, date)) + .and_then(|s| s.strip_suffix('/')) + .map(String::from) + }) + .filter(|dir| dir.starts_with("hour=")) + .collect(); + + Ok(hours) + } + + async fn list_minutes( + &self, + stream_name: &str, + date: &str, + hour: &str, + ) -> Result, ObjectStorageError> { + let pre = object_store::path::Path::from(format!("{}/{}/{}/", stream_name, date, hour)); + let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + let minutes = resp + .common_prefixes + .iter() + .filter_map(|path| { + path.as_ref() + .strip_prefix(&format!("{}/{}/{}/", stream_name, date, hour)) + .and_then(|s| s.strip_suffix('/')) + .map(String::from) + }) + .filter(|dir| dir.starts_with("minute=")) + .collect(); + + Ok(minutes) + } + async fn list_manifest_files( &self, stream_name: &str, diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 8e8e99541..82eca88fe 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -422,6 +422,43 @@ impl ObjectStorage for LocalFS { Ok(dates.into_iter().flatten().collect()) } + async fn list_hours( + &self, + stream_name: &str, + date: &str, + ) -> Result, ObjectStorageError> { + let path = self.root.join(stream_name).join(date); + let directories = ReadDirStream::new(fs::read_dir(&path).await?); + let entries: Vec = directories.try_collect().await?; + let entries = entries.into_iter().map(dir_name); + let hours: Vec<_> = FuturesUnordered::from_iter(entries).try_collect().await?; + Ok(hours + .into_iter() + .flatten() + .filter(|dir| dir.starts_with("hour=")) + .collect()) + } + + async fn list_minutes( + &self, + stream_name: &str, + date: &str, + hour: &str, + ) -> Result, ObjectStorageError> { + let path = self.root.join(stream_name).join(date).join(hour); + // Propagate any read_dir errors instead of swallowing them + let directories = ReadDirStream::new(fs::read_dir(&path).await?); + let entries: Vec = directories.try_collect().await?; + let entries = entries.into_iter().map(dir_name); + let minutes: Vec<_> = FuturesUnordered::from_iter(entries).try_collect().await?; + // Filter down to only the "minute=" prefixed directories + Ok(minutes + .into_iter() + .flatten() + .filter(|dir| dir.starts_with("minute=")) + .collect()) + } + async fn list_manifest_files( &self, _stream_name: &str, diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 1f6d39e88..4816dc3c5 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -45,7 +45,6 @@ use ulid::Ulid; use crate::alerts::AlertConfig; use crate::alerts::target::Target; -use crate::catalog::snapshot::ManifestItem; use crate::catalog::{self, manifest::Manifest, snapshot::Snapshot}; use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::event::format::LogSource; @@ -71,8 +70,6 @@ use super::{ STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, StorageMetadata, retention::Retention, }; -use crate::event::DEFAULT_TIMESTAMP_KEY; - /// Context for upload operations containing stream information pub(crate) struct UploadContext { stream: Arc, @@ -309,6 +306,17 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; + async fn list_hours( + &self, + stream_name: &str, + date: &str, + ) -> Result, ObjectStorageError>; + async fn list_minutes( + &self, + stream_name: &str, + date: &str, + hour: &str, + ) -> Result, ObjectStorageError>; async fn list_manifest_files( &self, stream_name: &str, @@ -839,6 +847,12 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } /// Retrieves both the first and latest event timestamps from storage for the specified stream. + /// Uses directory structure traversal instead of downloading manifest files for better performance. + /// + /// This optimized implementation avoids downloading potentially large manifest files by leveraging + /// the hierarchical directory structure (date=YYYY-MM-DD/hour=HH/minute=MM/) to derive timestamps. + /// It performs efficient list operations to find the min/max date, hour, and minute combinations, + /// then constructs the actual timestamps from this directory information. /// /// # Arguments /// @@ -852,75 +866,47 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { &self, stream_name: &str, ) -> Result<(Option, Option), ObjectStorageError> { - // Get time partition for the stream - let time_partition = if let Ok(stream) = crate::parseable::PARSEABLE.get_stream(stream_name) - { - stream.get_time_partition() - } else { - None - }; - - // Get parsed stream metadata files - let stream_jsons = self.get_stream_meta_from_storage(stream_name).await?; - - // Collect all manifest items from snapshots - let mut all_manifest_items = Vec::new(); - for stream_format in &stream_jsons { - let manifest_items = &stream_format.snapshot.manifest_list; - all_manifest_items.extend(manifest_items.iter()); - } - - if all_manifest_items.is_empty() { + // Get all available dates for the stream + let dates = self.list_dates(stream_name).await?; + if dates.is_empty() { return Ok((None, None)); } - // Find min/max in one pass - let (mut first_manifest_item, mut latest_manifest_item) = (None, None); - for &item in &all_manifest_items { - if first_manifest_item - .is_none_or(|cur: &ManifestItem| item.time_lower_bound < cur.time_lower_bound) - { - first_manifest_item = Some(item); - } - if latest_manifest_item - .is_none_or(|cur: &ManifestItem| item.time_upper_bound > cur.time_upper_bound) - { - latest_manifest_item = Some(item); - } + // Parse and sort dates to find min and max + let mut parsed_dates: Vec<_> = dates + .iter() + .filter_map(|date_str| { + // Extract date from "date=YYYY-MM-DD" format + if let Some(date_part) = date_str.strip_prefix("date=") { + chrono::NaiveDate::parse_from_str(date_part, "%Y-%m-%d") + .ok() + .map(|date| (date, date_str)) + } else { + None + } + }) + .collect(); + + if parsed_dates.is_empty() { + return Ok((None, None)); } - let partition_column = time_partition.as_deref().unwrap_or(DEFAULT_TIMESTAMP_KEY); + parsed_dates.sort_by_key(|(date, _)| *date); + let min_date = &parsed_dates[0].1; + let max_date = &parsed_dates[parsed_dates.len() - 1].1; - // Extract first and latest timestamps - check if we can reuse the same manifest - let (first_timestamp, latest_timestamp) = if let (Some(first_item), Some(latest_item)) = - (first_manifest_item, latest_manifest_item) - { - if first_item.manifest_path == latest_item.manifest_path { - // Same manifest, we can get both min and max in one pass - let manifest = self - .load_manifest_from_path(&first_item.manifest_path) - .await?; - self.extract_timestamps_from_manifest(&manifest, partition_column) - } else { - // Different manifests, need to load separately - let first_ts = self - .extract_timestamp_from_manifest( - &first_item.manifest_path, - partition_column, - true, - ) - .await?; - let latest_ts = self - .extract_timestamp_from_manifest( - &latest_item.manifest_path, - partition_column, - false, - ) - .await?; - (first_ts, latest_ts) - } + // Extract timestamps for min and max dates + let first_timestamp = self + .extract_timestamp_for_date(stream_name, min_date, true) + .await?; + let latest_timestamp = if min_date == max_date { + // Same date, get max timestamp from the same date + self.extract_timestamp_for_date(stream_name, max_date, false) + .await? } else { - (None, None) + // Different dates, get max from the latest date + self.extract_timestamp_for_date(stream_name, max_date, false) + .await? }; let first_event_at = first_timestamp.map(|ts| ts.to_rfc3339()); @@ -929,111 +915,85 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok((first_event_at, latest_event_at)) } - /// Helper method to load a manifest file from object storage - async fn load_manifest_from_path( + /// Extract timestamp for a specific date by traversing the hour/minute structure + async fn extract_timestamp_for_date( &self, - manifest_path: &str, - ) -> Result { - use crate::{catalog::manifest::Manifest, query::QUERY_SESSION}; - - let object_store = QUERY_SESSION - .state() - .runtime_env() - .object_store_registry - .get_store(&self.store_url()) - .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?; - - let path = object_store::path::Path::parse(manifest_path) - .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?; - - let manifest_response = object_store - .get(&path) - .await - .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?; - - let manifest_bytes = manifest_response - .bytes() - .await - .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?; + stream_name: &str, + date: &str, + find_min: bool, + ) -> Result>, ObjectStorageError> { + // Get all hours for this date + let hours = self.list_hours(stream_name, date).await?; + if hours.is_empty() { + return Ok(None); + } - let manifest: Manifest = serde_json::from_slice(&manifest_bytes) - .map_err(|e| ObjectStorageError::UnhandledError(Box::new(e)))?; + // Find min/max hour and corresponding string without collecting all values + let (target_hour_value, target_hour_str) = hours + .iter() + .filter_map(|hour_str| { + hour_str.strip_prefix("hour=").and_then(|hour_part| { + hour_part.parse::().ok().map(|hour| (hour, hour_str)) + }) + }) + .reduce(|acc, curr| { + if find_min { + if curr.0 < acc.0 { curr } else { acc } + } else if curr.0 > acc.0 { + curr + } else { + acc + } + }) + .ok_or_else(|| ObjectStorageError::Custom("No valid hours found".to_string()))?; - Ok(manifest) - } + // Get all minutes for the target hour + let minutes = self + .list_minutes(stream_name, date, target_hour_str) + .await?; + if minutes.is_empty() { + return Ok(None); + } - /// Helper method to extract min and max timestamps from a manifest - /// Returns (min_timestamp, max_timestamp) - fn extract_timestamps_from_manifest( - &self, - manifest: &Manifest, - partition_column: &str, - ) -> (Option>, Option>) { - use crate::catalog::column::TypedStatistics; - use chrono::{DateTime, Utc}; - - let mut min_timestamp: Option> = None; - let mut max_timestamp: Option> = None; - - for file in &manifest.files { - if let Some(column) = file.columns.iter().find(|col| col.name == partition_column) - && let Some(stats) = &column.stats - { - match stats { - TypedStatistics::Int(int_stats) => { - if let Some(min_ts) = DateTime::from_timestamp_millis(int_stats.min) { - min_timestamp = Some(match min_timestamp { - Some(existing) => existing.min(min_ts), - None => min_ts, - }); - } - if let Some(max_ts) = DateTime::from_timestamp_millis(int_stats.max) { - max_timestamp = Some(match max_timestamp { - Some(existing) => existing.max(max_ts), - None => max_ts, - }); - } - } - TypedStatistics::String(str_stats) => { - if let Ok(min_ts) = DateTime::parse_from_rfc3339(&str_stats.min) { - let min_ts = min_ts.with_timezone(&Utc); - min_timestamp = Some(match min_timestamp { - Some(existing) => existing.min(min_ts), - None => min_ts, - }); - } - if let Ok(max_ts) = DateTime::parse_from_rfc3339(&str_stats.max) { - let max_ts = max_ts.with_timezone(&Utc); - max_timestamp = Some(match max_timestamp { - Some(existing) => existing.max(max_ts), - None => max_ts, - }); - } - } - _ => {} // Skip other types + // Find min/max minute directly without collecting all values + let target_minute = minutes + .iter() + .filter_map(|minute_str| { + minute_str + .strip_prefix("minute=") + .and_then(|minute_part| minute_part.parse::().ok()) + }) + .reduce(|acc, curr| { + if find_min { + if curr < acc { curr } else { acc } + } else if curr > acc { + curr + } else { + acc } - } - } + }) + .ok_or_else(|| ObjectStorageError::Custom("No valid minutes found".to_string()))?; - (min_timestamp, max_timestamp) - } + // Extract date components and construct timestamp + if let Some(date_part) = date.strip_prefix("date=") + && let Ok(parsed_date) = chrono::NaiveDate::parse_from_str(date_part, "%Y-%m-%d") + { + // Create timestamp from date, hour, and minute with seconds hardcoded to 00 + let naive_datetime = parsed_date + .and_hms_opt(target_hour_value, target_minute, 0) + .unwrap_or_else(|| { + parsed_date + .and_hms_opt(target_hour_value, target_minute, 0) + .unwrap_or_else(|| parsed_date.and_hms_opt(0, 0, 0).unwrap()) + }); - /// Helper method to extract timestamp from a manifest file - async fn extract_timestamp_from_manifest( - &self, - manifest_path: &str, - partition_column: &str, - find_min: bool, - ) -> Result>, ObjectStorageError> { - let manifest = self.load_manifest_from_path(manifest_path).await?; - let (min_timestamp, max_timestamp) = - self.extract_timestamps_from_manifest(&manifest, partition_column); + return Ok(Some(DateTime::from_naive_utc_and_offset( + naive_datetime, + Utc, + ))); + } - Ok(if find_min { - min_timestamp - } else { - max_timestamp - }) + Ok(None) } // pick a better name diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 1f150c4de..3b682a1ef 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -781,6 +781,53 @@ impl ObjectStorage for S3 { Ok(streams) } + async fn list_hours( + &self, + stream_name: &str, + date: &str, + ) -> Result, ObjectStorageError> { + let pre = object_store::path::Path::from(format!("{}/{}/", stream_name, date)); + let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + let hours = resp + .common_prefixes + .iter() + .filter_map(|path| { + path.as_ref() + .strip_prefix(&format!("{}/{}/", stream_name, date)) + .and_then(|s| s.strip_suffix('/')) + .map(String::from) + }) + .filter(|dir| dir.starts_with("hour=")) + .collect(); + + Ok(hours) + } + + async fn list_minutes( + &self, + stream_name: &str, + date: &str, + hour: &str, + ) -> Result, ObjectStorageError> { + let pre = object_store::path::Path::from(format!("{}/{}/{}/", stream_name, date, hour)); + let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + let minutes = resp + .common_prefixes + .iter() + .filter_map(|path| { + path.as_ref() + .strip_prefix(&format!("{}/{}/{}/", stream_name, date, hour)) + .and_then(|s| s.strip_suffix('/')) + .map(String::from) + }) + .filter(|dir| dir.starts_with("minute=")) + .collect(); + + Ok(minutes) + } + async fn list_manifest_files( &self, stream_name: &str, From 935b9cfd52f1b08427c32d18f5e159945f91654f Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 21 Aug 2025 03:36:45 -0700 Subject: [PATCH 2/4] simplify --- src/storage/object_storage.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 4816dc3c5..b189298f1 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -899,15 +899,9 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let first_timestamp = self .extract_timestamp_for_date(stream_name, min_date, true) .await?; - let latest_timestamp = if min_date == max_date { - // Same date, get max timestamp from the same date - self.extract_timestamp_for_date(stream_name, max_date, false) - .await? - } else { - // Different dates, get max from the latest date - self.extract_timestamp_for_date(stream_name, max_date, false) - .await? - }; + let latest_timestamp = self + .extract_timestamp_for_date(stream_name, max_date, false) + .await?; let first_event_at = first_timestamp.map(|ts| ts.to_rfc3339()); let latest_event_at = latest_timestamp.map(|ts| ts.to_rfc3339()); From db0caef9acd89cd90f3492ea03e61506b3181c64 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 21 Aug 2025 04:31:47 -0700 Subject: [PATCH 3/4] fix object store listing --- src/storage/azure_blob.rs | 27 +++++++++++++++++++-------- src/storage/gcs.rs | 27 +++++++++++++++++++-------- src/storage/s3.rs | 31 +++++++++++++++++++++---------- 3 files changed, 59 insertions(+), 26 deletions(-) diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 63feabe05..1c6cf300b 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -710,10 +710,15 @@ impl ObjectStorage for BlobStore { .common_prefixes .iter() .filter_map(|path| { - path.as_ref() - .strip_prefix(&format!("{}/{}/", stream_name, date)) - .and_then(|s| s.strip_suffix('/')) - .map(String::from) + let path_str = path.as_ref(); + if let Some(stripped) = path_str.strip_prefix(&format!("{}/{}/", stream_name, date)) + { + // Remove trailing slash if present, otherwise use as is + let clean_path = stripped.strip_suffix('/').unwrap_or(stripped); + Some(clean_path.to_string()) + } else { + None + } }) .filter(|dir| dir.starts_with("hour=")) .collect(); @@ -734,10 +739,16 @@ impl ObjectStorage for BlobStore { .common_prefixes .iter() .filter_map(|path| { - path.as_ref() - .strip_prefix(&format!("{}/{}/{}/", stream_name, date, hour)) - .and_then(|s| s.strip_suffix('/')) - .map(String::from) + let path_str = path.as_ref(); + if let Some(stripped) = + path_str.strip_prefix(&format!("{}/{}/{}/", stream_name, date, hour)) + { + // Remove trailing slash if present, otherwise use as is + let clean_path = stripped.strip_suffix('/').unwrap_or(stripped); + Some(clean_path.to_string()) + } else { + None + } }) .filter(|dir| dir.starts_with("minute=")) .collect(); diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index 7912508f1..8171344f5 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -617,10 +617,15 @@ impl ObjectStorage for Gcs { .common_prefixes .iter() .filter_map(|path| { - path.as_ref() - .strip_prefix(&format!("{}/{}/", stream_name, date)) - .and_then(|s| s.strip_suffix('/')) - .map(String::from) + let path_str = path.as_ref(); + if let Some(stripped) = path_str.strip_prefix(&format!("{}/{}/", stream_name, date)) + { + // Remove trailing slash if present, otherwise use as is + let clean_path = stripped.strip_suffix('/').unwrap_or(stripped); + Some(clean_path.to_string()) + } else { + None + } }) .filter(|dir| dir.starts_with("hour=")) .collect(); @@ -641,10 +646,16 @@ impl ObjectStorage for Gcs { .common_prefixes .iter() .filter_map(|path| { - path.as_ref() - .strip_prefix(&format!("{}/{}/{}/", stream_name, date, hour)) - .and_then(|s| s.strip_suffix('/')) - .map(String::from) + let path_str = path.as_ref(); + if let Some(stripped) = + path_str.strip_prefix(&format!("{}/{}/{}/", stream_name, date, hour)) + { + // Remove trailing slash if present, otherwise use as is + let clean_path = stripped.strip_suffix('/').unwrap_or(stripped); + Some(clean_path.to_string()) + } else { + None + } }) .filter(|dir| dir.starts_with("minute=")) .collect(); diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 3b682a1ef..824ab021a 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -789,14 +789,19 @@ impl ObjectStorage for S3 { let pre = object_store::path::Path::from(format!("{}/{}/", stream_name, date)); let resp = self.client.list_with_delimiter(Some(&pre)).await?; - let hours = resp + let hours: Vec = resp .common_prefixes .iter() .filter_map(|path| { - path.as_ref() - .strip_prefix(&format!("{}/{}/", stream_name, date)) - .and_then(|s| s.strip_suffix('/')) - .map(String::from) + let path_str = path.as_ref(); + if let Some(stripped) = path_str.strip_prefix(&format!("{}/{}/", stream_name, date)) + { + // Remove trailing slash if present, otherwise use as is + let clean_path = stripped.strip_suffix('/').unwrap_or(stripped); + Some(clean_path.to_string()) + } else { + None + } }) .filter(|dir| dir.starts_with("hour=")) .collect(); @@ -813,14 +818,20 @@ impl ObjectStorage for S3 { let pre = object_store::path::Path::from(format!("{}/{}/{}/", stream_name, date, hour)); let resp = self.client.list_with_delimiter(Some(&pre)).await?; - let minutes = resp + let minutes: Vec = resp .common_prefixes .iter() .filter_map(|path| { - path.as_ref() - .strip_prefix(&format!("{}/{}/{}/", stream_name, date, hour)) - .and_then(|s| s.strip_suffix('/')) - .map(String::from) + let path_str = path.as_ref(); + if let Some(stripped) = + path_str.strip_prefix(&format!("{}/{}/{}/", stream_name, date, hour)) + { + // Remove trailing slash if present, otherwise use as is + let clean_path = stripped.strip_suffix('/').unwrap_or(stripped); + Some(clean_path.to_string()) + } else { + None + } }) .filter(|dir| dir.starts_with("minute=")) .collect(); From 541744e2986695ece4471742b634699d9fd24788 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 21 Aug 2025 05:00:21 -0700 Subject: [PATCH 4/4] remove redundant logic, add comments --- src/storage/object_storage.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index b189298f1..a1e987068 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -306,11 +306,18 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; + /// Lists the immediate “hour=” partition directories under the given date. + /// Only immediate child entries named `hour=HH` should be returned (no trailing slash). + /// `HH` must be zero-padded two-digit numerals (`"hour=00"` through `"hour=23"`). async fn list_hours( &self, stream_name: &str, date: &str, ) -> Result, ObjectStorageError>; + + /// Lists the immediate “minute=” partition directories under the given date/hour. + /// Only immediate child entries named `minute=MM` should be returned (no trailing slash). + /// `MM` must be zero-padded two-digit numerals (`"minute=00"` through `"minute=59"`). async fn list_minutes( &self, stream_name: &str, @@ -975,11 +982,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { // Create timestamp from date, hour, and minute with seconds hardcoded to 00 let naive_datetime = parsed_date .and_hms_opt(target_hour_value, target_minute, 0) - .unwrap_or_else(|| { - parsed_date - .and_hms_opt(target_hour_value, target_minute, 0) - .unwrap_or_else(|| parsed_date.and_hms_opt(0, 0, 0).unwrap()) - }); + .unwrap_or_else(|| parsed_date.and_hms_opt(0, 0, 0).unwrap()); return Ok(Some(DateTime::from_naive_utc_and_offset( naive_datetime,