From da01b19916b4976f4f04cb3fa4b3bae167943cca Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sat, 14 Jun 2025 09:04:58 -0400 Subject: [PATCH 1/8] fix: multiple fixes around system stability under load 1. perform object store sync for all streams in parallel 2. remove restriction of multi threading to utilise all available cores 3. add atomicity in conversion by - i. each conversion task processes one minute of arrows ii. move arrow files to inprocess folder to maintain atomicity iii. add a init sync task to process all pending files iv. add tokio sleep of 5 secs in shutdown task to let complete ongoing jobs v. remove unwrap of write locks to avoid thread poisoning --- src/handlers/http/health_check.rs | 37 ++- src/handlers/http/modal/query_server.rs | 8 + src/handlers/http/modal/server.rs | 8 + src/parseable/staging/reader.rs | 26 +- src/parseable/streams.rs | 378 ++++++++++++++++++------ src/storage/object_storage.rs | 147 +++++---- src/sync.rs | 90 ++++-- 7 files changed, 481 insertions(+), 213 deletions(-) diff --git a/src/handlers/http/health_check.rs b/src/handlers/http/health_check.rs index d99d62c7d..b4803e124 100644 --- a/src/handlers/http/health_check.rs +++ b/src/handlers/http/health_check.rs @@ -29,12 +29,12 @@ use actix_web::{ use http::StatusCode; use once_cell::sync::Lazy; use tokio::{sync::Mutex, task::JoinSet}; -use tracing::{error, info, warn}; +use tracing::{error, info}; -use crate::parseable::PARSEABLE; +use crate::{parseable::PARSEABLE, storage::object_storage::sync_all_streams}; // Create a global variable to store signal status -static SIGNAL_RECEIVED: Lazy>> = Lazy::new(|| Arc::new(Mutex::new(false))); +pub static SIGNAL_RECEIVED: Lazy>> = Lazy::new(|| Arc::new(Mutex::new(false))); pub async fn liveness() -> HttpResponse { HttpResponse::new(StatusCode::OK) @@ -60,28 +60,33 @@ pub async fn shutdown() { let mut shutdown_flag = SIGNAL_RECEIVED.lock().await; *shutdown_flag = true; - let mut joinset = JoinSet::new(); + //sleep for 5 secs to allow any ongoing requests to finish + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + let mut local_sync_joinset = JoinSet::new(); // Sync staging - PARSEABLE.streams.flush_and_convert(&mut joinset, true); + PARSEABLE + .streams + .flush_and_convert(&mut local_sync_joinset, false, true); - while let Some(res) = joinset.join_next().await { + while let Some(res) = local_sync_joinset.join_next().await { match res { Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."), - Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"), + Ok(Err(err)) => error!("Failed to convert arrow files to parquet. {err:?}"), Err(err) => error!("Failed to join async task: {err}"), } } - if let Err(e) = PARSEABLE - .storage - .get_object_store() - .upload_files_from_staging() - .await - { - warn!("Failed to sync local data with object store. {:?}", e); - } else { - info!("Successfully synced all data to S3."); + // Sync object store + let mut object_store_joinset = JoinSet::new(); + sync_all_streams(&mut object_store_joinset); + + while let Some(res) = object_store_joinset.join_next().await { + match res { + Ok(Ok(_)) => info!("Successfully synced all data to S3."), + Ok(Err(err)) => error!("Failed to sync local data with object store. {err:?}"), + Err(err) => error!("Failed to join async task: {err}"), + } } } diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index b138a292c..e8a256925 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -27,6 +27,7 @@ use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE}; use crate::handlers::http::{rbac, role}; use crate::hottier::HotTierManager; use crate::rbac::role::Action; +use crate::sync::sync_start; use crate::{analytics, migration, storage, sync}; use actix_web::web::{resource, ServiceConfig}; use actix_web::{web, Scope}; @@ -126,6 +127,13 @@ impl ParseableServer for QueryServer { if init_cluster_metrics_schedular().is_ok() { info!("Cluster metrics scheduler started successfully"); } + + // local sync on init + tokio::spawn(async { + if let Err(e) = sync_start().await { + tracing::warn!("local sync on server start failed: {e}"); + } + }); if let Some(hot_tier_manager) = HotTierManager::global() { hot_tier_manager.put_internal_stream_hot_tier().await?; hot_tier_manager.download_from_s3()?; diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 467712646..64b70b46f 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -33,6 +33,7 @@ use crate::metrics; use crate::migration; use crate::storage; use crate::sync; +use crate::sync::sync_start; use actix_web::web; use actix_web::web::resource; @@ -122,6 +123,13 @@ impl ParseableServer for Server { storage::retention::load_retention_from_global(); + // local sync on init + tokio::spawn(async { + if let Err(e) = sync_start().await { + tracing::warn!("local sync on server start failed: {e}"); + } + }); + if let Some(hot_tier_manager) = HotTierManager::global() { hot_tier_manager.download_from_s3()?; }; diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 372bfa885..d131ac0c8 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -30,7 +30,7 @@ use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader}; use arrow_schema::Schema; use byteorder::{LittleEndian, ReadBytesExt}; use itertools::kmerge_by; -use tracing::{error, warn}; +use tracing::error; use crate::{ event::DEFAULT_TIMESTAMP_KEY, @@ -85,20 +85,22 @@ impl MergedReverseRecordReader { pub fn try_new(file_paths: &[PathBuf]) -> Self { let mut readers = Vec::with_capacity(file_paths.len()); for path in file_paths { - let Ok(file) = File::open(path) else { - warn!("Error when trying to read file: {path:?}"); - continue; - }; - - let reader = match get_reverse_reader(file) { - Ok(r) => r, + match File::open(path) { Err(err) => { - error!("Invalid file detected, ignoring it: {path:?}; error = {err}"); + error!("Error when trying to read file: {path:?}; error = {err}"); continue; } - }; - - readers.push(reader); + Ok(file) => { + let reader = match get_reverse_reader(file) { + Ok(r) => r, + Err(err) => { + error!("Invalid file detected, ignoring it: {path:?}; error = {err}"); + continue; + } + }; + readers.push(reader); + } + } } Self { readers } diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index c67c60043..09997921f 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -19,10 +19,9 @@ use std::{ collections::{HashMap, HashSet}, - fs::{remove_file, write, File, OpenOptions}, + fs::{self, remove_file, write, File, OpenOptions}, num::NonZeroU32, path::{Path, PathBuf}, - process, sync::{Arc, Mutex, RwLock}, time::{Instant, SystemTime, UNIX_EPOCH}, }; @@ -39,7 +38,6 @@ use parquet::{ format::SortingColumn, schema::types::ColumnPath, }; -use rand::distributions::DistString; use relative_path::RelativePathBuf; use tokio::task::JoinSet; use tracing::{error, info, trace, warn}; @@ -50,6 +48,7 @@ use crate::{ format::{LogSource, LogSourceEntry}, DEFAULT_TIMESTAMP_KEY, }, + handlers::http::modal::{ingest_server::INGESTOR_META, query_server::QUERIER_META}, metadata::{LogStreamMetadata, SchemaVersion}, metrics, option::Mode, @@ -68,14 +67,13 @@ use super::{ }; /// Returns the filename for parquet if provided arrows file path is valid as per our expectation -fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option { +fn arrow_path_to_parquet(staging_path: &Path, path: &Path, random_string: &str) -> Option { let filename = path.file_stem()?.to_str()?; let (_, front) = filename.split_once('.')?; assert!(front.contains('.'), "contains the delim `.`"); let filename_with_random_number = format!("{front}.{random_string}.parquet"); - let mut parquet_path = path.to_owned(); - parquet_path.set_file_name(filename_with_random_number); - + let mut parquet_path = staging_path.to_owned(); + parquet_path.push(filename_with_random_number); Some(parquet_path) } @@ -114,7 +112,7 @@ impl Stream { let data_path = options.local_stream_data_path(&stream_name); Arc::new(Self { - stream_name, + stream_name: stream_name.clone(), metadata: RwLock::new(metadata), data_path, options, @@ -132,7 +130,16 @@ impl Stream { custom_partition_values: &HashMap, stream_type: StreamType, ) -> Result<(), StagingError> { - let mut guard = self.writer.lock().unwrap(); + let mut guard = match self.writer.lock() { + Ok(guard) => guard, + Err(poisoned) => { + error!( + "Writer lock poisoned while ingesting data for stream {}", + self.stream_name + ); + poisoned.into_inner() + } + }; if self.options.mode != Mode::Query || stream_type == StreamType::Internal { let filename = self.filename_by_partition(schema_key, parsed_timestamp, custom_partition_values); @@ -191,17 +198,47 @@ impl Stream { return vec![]; }; - let paths = dir - .flatten() + dir.flatten() .map(|file| file.path()) .filter(|file| { file.extension() .is_some_and(|ext| ext.eq(ARROW_FILE_EXTENSION)) }) - .sorted_by_key(|f| f.metadata().unwrap().modified().unwrap()) - .collect(); + .filter_map(|f| { + let modified = f.metadata().ok().and_then(|m| m.modified().ok()); + modified.map(|modified_time| (f, modified_time)) + }) + .sorted_by_key(|(_, modified_time)| *modified_time) + .map(|(f, _)| f) + .collect() + } + + pub fn inprocess_arrow_files(&self) -> Vec { + let Ok(dir) = self.data_path.read_dir() else { + return vec![]; + }; - paths + //iterate through all the inprocess_ directories and collect all arrow files + dir.filter_map(|entry| { + let path = entry.ok()?.path(); + if path.is_dir() && path.file_name()?.to_str()?.starts_with("inprocess_") { + Some(path) + } else { + None + } + }) + .flat_map(|dir| { + fs::read_dir(dir) + .ok() + .into_iter() + .flatten() + .filter_map(|entry| entry.ok().map(|file| file.path())) + }) + .filter(|file| { + file.extension() + .is_some_and(|ext| ext.eq(ARROW_FILE_EXTENSION)) + }) + .collect::>() } /// Groups arrow files which are to be included in one parquet @@ -212,13 +249,98 @@ impl Stream { pub fn arrow_files_grouped_exclude_time( &self, exclude: SystemTime, + group_minute: u128, + init_signal: bool, shutdown_signal: bool, ) -> HashMap> { - let mut grouped_arrow_file: HashMap> = HashMap::new(); - let mut arrow_files = self.arrow_files(); + let random_string = self.get_node_id_string(); + let inprocess_dir = Self::inprocess_folder(&self.data_path, group_minute); + + let arrow_files = self.fetch_arrow_files_for_conversion(exclude, shutdown_signal); + if !arrow_files.is_empty() { + if let Err(e) = fs::create_dir_all(&inprocess_dir) { + error!("Failed to create inprocess directory: {e}"); + return HashMap::new(); + } + + self.move_arrow_files(arrow_files, &inprocess_dir); + } + if init_signal { + // Group from all inprocess folders + return self.group_inprocess_arrow_files(&random_string); + } - // if the shutdown signal is false i.e. normal condition - // don't keep the ones for the current minute + self.group_single_inprocess_arrow_files(&inprocess_dir, &random_string) + } + + /// Groups arrow files only from the specified inprocess folder + fn group_single_inprocess_arrow_files( + &self, + inprocess_dir: &Path, + random_string: &str, + ) -> HashMap> { + let mut grouped: HashMap> = HashMap::new(); + let Ok(dir) = fs::read_dir(inprocess_dir) else { + return grouped; + }; + for entry in dir.flatten() { + let path = entry.path(); + if path + .extension() + .is_some_and(|ext| ext.eq(ARROW_FILE_EXTENSION)) + { + if let Some(parquet_path) = + arrow_path_to_parquet(&self.data_path, &path, random_string) + { + grouped.entry(parquet_path).or_default().push(path); + } else { + warn!("Unexpected arrow file: {}", path.display()); + } + } + } + grouped + } + + /// Returns the node id string for file naming. + fn get_node_id_string(&self) -> String { + match self.options.mode { + Mode::Query => QUERIER_META + .get() + .map(|querier_metadata| querier_metadata.get_node_id()) + .expect("Querier metadata should be set"), + Mode::Ingest => INGESTOR_META + .get() + .map(|ingestor_metadata| ingestor_metadata.get_node_id()) + .expect("Ingestor metadata should be set"), + _ => "000000000000000".to_string(), + } + } + + /// Returns a mapping for inprocess arrow files (init_signal=true). + fn group_inprocess_arrow_files(&self, random_string: &str) -> HashMap> { + let mut grouped: HashMap> = HashMap::new(); + for inprocess_file in self.inprocess_arrow_files() { + if let Some(parquet_path) = + arrow_path_to_parquet(&self.data_path, &inprocess_file, random_string) + { + grouped + .entry(parquet_path) + .or_default() + .push(inprocess_file); + } else { + warn!("Unexpected arrow file: {}", inprocess_file.display()); + } + } + grouped + } + + /// Returns arrow files for conversion, filtering by time and removing invalid files. + fn fetch_arrow_files_for_conversion( + &self, + exclude: SystemTime, + shutdown_signal: bool, + ) -> Vec { + let mut arrow_files = self.arrow_files(); if !shutdown_signal { arrow_files.retain(|path| { let creation = path @@ -230,26 +352,46 @@ impl Stream { minute_from_system_time(creation) < minute_from_system_time(exclude) }); } + arrow_files + } - let random_string = - rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15); + /// Moves eligible arrow files to the inprocess folder and groups them by parquet path. + fn move_arrow_files(&self, arrow_files: Vec, inprocess_dir: &Path) { for arrow_file_path in arrow_files { - if arrow_file_path.metadata().unwrap().len() == 0 { - error!( - "Invalid arrow file {:?} detected for stream {}, removing it", - &arrow_file_path, self.stream_name - ); - remove_file(&arrow_file_path).unwrap(); - } else if let Some(key) = arrow_path_to_parquet(&arrow_file_path, &random_string) { - grouped_arrow_file - .entry(key) - .or_default() - .push(arrow_file_path); - } else { - warn!("Unexpected arrows file: {}", arrow_file_path.display()); + match arrow_file_path.metadata() { + Ok(meta) if meta.len() == 0 => { + error!( + "Invalid arrow file {:?} detected for stream {}, removing it", + &arrow_file_path, self.stream_name + ); + remove_file(&arrow_file_path).expect("File should be removed"); + } + Ok(_) => { + let new_path = inprocess_dir.join( + arrow_file_path + .file_name() + .expect("Arrow file should have a name"), + ); + if let Err(e) = fs::rename(&arrow_file_path, &new_path) { + error!( + "Failed to rename arrow file to inprocess directory: {} -> {}: {e}", + arrow_file_path.display(), + new_path.display() + ); + } + } + Err(e) => { + warn!( + "Could not get metadata for arrow file {}: {e}", + arrow_file_path.display() + ); + } } } - grouped_arrow_file + } + + fn inprocess_folder(base: &Path, minute: u128) -> PathBuf { + base.join(format!("inprocess_{}", minute)) } pub fn parquet_files(&self) -> Vec { @@ -306,7 +448,11 @@ impl Stream { } /// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal` - pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> { + pub fn prepare_parquet( + &self, + init_signal: bool, + shutdown_signal: bool, + ) -> Result<(), StagingError> { info!( "Starting arrow_conversion job for stream- {}", self.stream_name @@ -321,6 +467,7 @@ impl Stream { .convert_disk_files_to_parquet( time_partition.as_ref(), custom_partition.as_ref(), + init_signal, shutdown_signal, ) .inspect_err(|err| warn!("Error while converting arrow to parquet- {err:?}"))?; @@ -339,11 +486,6 @@ impl Stream { let staging_schemas = self.get_schemas_if_present(); if let Some(mut staging_schemas) = staging_schemas { - warn!( - "Found {} schemas in staging for stream- {}", - staging_schemas.len(), - self.stream_name - ); staging_schemas.push(schema); schema = Schema::try_merge(staging_schemas)?; } @@ -367,7 +509,16 @@ impl Stream { } pub fn flush(&self, forced: bool) { - let mut writer = self.writer.lock().unwrap(); + let mut writer = match self.writer.lock() { + Ok(guard) => guard, + Err(poisoned) => { + error!( + "Writer lock poisoned while flushing data for stream {}", + self.stream_name + ); + poisoned.into_inner() + } + }; // Flush memory writer.mem.clear(); // Drop schema -> disk writer mapping, triggers flush to disk @@ -421,38 +572,24 @@ impl Stream { props.set_sorting_columns(Some(sorting_column_vec)).build() } - /// This function reads arrow files, groups their schemas - /// - /// converts them into parquet files and returns a merged schema - pub fn convert_disk_files_to_parquet( - &self, - time_partition: Option<&String>, - custom_partition: Option<&String>, - shutdown_signal: bool, - ) -> Result, StagingError> { - let mut schemas = Vec::new(); - - let now = SystemTime::now(); - let staging_files = self.arrow_files_grouped_exclude_time(now, shutdown_signal); - if staging_files.is_empty() { - metrics::STAGING_FILES - .with_label_values(&[&self.stream_name]) - .set(0); - metrics::STORAGE_SIZE - .with_label_values(&["staging", &self.stream_name, "arrows"]) - .set(0); - metrics::STORAGE_SIZE - .with_label_values(&["staging", &self.stream_name, "parquet"]) - .set(0); - } + fn reset_staging_metrics(&self) { + metrics::STAGING_FILES + .with_label_values(&[&self.stream_name]) + .set(0); + metrics::STORAGE_SIZE + .with_label_values(&["staging", &self.stream_name, "arrows"]) + .set(0); + metrics::STORAGE_SIZE + .with_label_values(&["staging", &self.stream_name, "parquet"]) + .set(0); + } - //find sum of arrow files in staging directory for a stream + fn update_staging_metrics(&self, staging_files: &HashMap>) { let total_arrow_files = staging_files.values().map(|v| v.len()).sum::(); metrics::STAGING_FILES .with_label_values(&[&self.stream_name]) .set(total_arrow_files as i64); - //find sum of file sizes of all arrow files in staging_files let total_arrow_files_size = staging_files .values() .map(|v| { @@ -464,6 +601,31 @@ impl Stream { metrics::STORAGE_SIZE .with_label_values(&["staging", &self.stream_name, "arrows"]) .set(total_arrow_files_size as i64); + } + + /// This function reads arrow files, groups their schemas + /// + /// converts them into parquet files and returns a merged schema + pub fn convert_disk_files_to_parquet( + &self, + time_partition: Option<&String>, + custom_partition: Option<&String>, + init_signal: bool, + shutdown_signal: bool, + ) -> Result, StagingError> { + let mut schemas = Vec::new(); + + let now = SystemTime::now(); + let group_minute = minute_from_system_time(now) - 1; + let staging_files = + self.arrow_files_grouped_exclude_time(now, group_minute, init_signal, shutdown_signal); + if staging_files.is_empty() { + self.reset_staging_metrics(); + return Ok(None); + } + + //find sum of arrow files in staging directory for a stream + self.update_staging_metrics(&staging_files); // warn!("staging files-\n{staging_files:?}\n"); for (parquet_path, arrow_files) in staging_files { @@ -496,34 +658,53 @@ impl Stream { "Invalid parquet file {part_path:?} detected for stream {}, removing it", &self.stream_name ); - remove_file(part_path).unwrap(); - } else { - trace!("Parquet file successfully constructed"); - if let Err(e) = std::fs::rename(&part_path, &parquet_path) { - error!( - "Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}" - ); - } + remove_file(part_path).expect("File should be removable if it is invalid"); + continue; + } + trace!("Parquet file successfully constructed"); - for file in arrow_files { - let file_size = match file.metadata() { - Ok(meta) => meta.len(), + if let Err(e) = std::fs::rename(&part_path, &parquet_path) { + error!("Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}"); + } else { + // delete the files that were grouped to create parquet file + for (i, file) in arrow_files.iter().enumerate() { + match file.metadata() { + Ok(meta) => { + let file_size = meta.len(); + match remove_file(file) { + Ok(_) => { + metrics::STORAGE_SIZE + .with_label_values(&[ + "staging", + &self.stream_name, + ARROW_FILE_EXTENSION, + ]) + .sub(file_size as i64); + } + Err(e) => { + warn!("Failed to delete file {}: {e}", file.display()); + } + } + } Err(err) => { warn!("File ({}) not found; Error = {err}", file.display()); - continue; } - }; - if remove_file(&file).is_err() { - error!("Failed to delete file. Unstable state"); - process::abort() } - metrics::STORAGE_SIZE - .with_label_values(&["staging", &self.stream_name, ARROW_FILE_EXTENSION]) - .sub(file_size as i64); + + // After deleting the last file, try to remove the inprocess directory + if i == arrow_files.len() - 1 { + if let Some(parent_dir) = file.parent() { + if let Err(err) = fs::remove_dir(parent_dir) { + warn!( + "Failed to remove inprocess directory {}: {err}", + parent_dir.display() + ); + } + } + } } } } - if schemas.is_empty() { return Ok(None); } @@ -725,7 +906,11 @@ impl Stream { } /// First flushes arrows onto disk and then converts the arrow into parquet - pub fn flush_and_convert(&self, shutdown_signal: bool) -> Result<(), StagingError> { + pub fn flush_and_convert( + &self, + init_signal: bool, + shutdown_signal: bool, + ) -> Result<(), StagingError> { let start_flush = Instant::now(); self.flush(shutdown_signal); trace!( @@ -735,7 +920,7 @@ impl Stream { ); let start_convert = Instant::now(); - self.prepare_parquet(shutdown_signal)?; + self.prepare_parquet(init_signal, shutdown_signal)?; trace!( "Converting arrows to parquet on stream ({}) took: {}s", self.stream_name, @@ -820,6 +1005,7 @@ impl Streams { pub fn flush_and_convert( &self, joinset: &mut JoinSet>, + init_signal: bool, shutdown_signal: bool, ) { let streams: Vec> = self @@ -829,7 +1015,7 @@ impl Streams { .map(Arc::clone) .collect(); for stream in streams { - joinset.spawn(async move { stream.flush_and_convert(shutdown_signal) }); + joinset.spawn(async move { stream.flush_and_convert(init_signal, shutdown_signal) }); } } } @@ -1018,7 +1204,7 @@ mod tests { LogStreamMetadata::default(), None, ) - .convert_disk_files_to_parquet(None, None, false)?; + .convert_disk_files_to_parquet(None, None, false, false)?; assert!(result.is_none()); // Verify metrics were set to 0 let staging_files = metrics::STAGING_FILES.with_label_values(&[&stream]).get(); @@ -1097,7 +1283,7 @@ mod tests { // Start with a fresh staging let staging = Stream::new(options, stream_name, LogStreamMetadata::default(), None); let result = staging - .convert_disk_files_to_parquet(None, None, true) + .convert_disk_files_to_parquet(None, None, false, true) .unwrap(); assert!(result.is_some()); @@ -1146,7 +1332,7 @@ mod tests { // Start with a fresh staging let staging = Stream::new(options, stream_name, LogStreamMetadata::default(), None); let result = staging - .convert_disk_files_to_parquet(None, None, true) + .convert_disk_files_to_parquet(None, None, false, true) .unwrap(); assert!(result.is_some()); @@ -1200,7 +1386,7 @@ mod tests { // Start with a fresh staging let staging = Stream::new(options, stream_name, LogStreamMetadata::default(), None); let result = staging - .convert_disk_files_to_parquet(None, None, false) + .convert_disk_files_to_parquet(None, None, false, false) .unwrap(); assert!(result.is_some()); @@ -1228,7 +1414,7 @@ mod tests { let file_path = create_test_file(&temp_dir, filename); let random_string = "random123"; - let result = arrow_path_to_parquet(&file_path, random_string); + let result = arrow_path_to_parquet(&file_path, &file_path, random_string); assert!(result.is_some()); let parquet_path = result.unwrap(); @@ -1253,7 +1439,7 @@ mod tests { let random_string = "random456"; - let result = arrow_path_to_parquet(&file_path, random_string); + let result = arrow_path_to_parquet(&file_path, &file_path, random_string); assert!(result.is_some()); let parquet_path = result.unwrap(); diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 49da6b8fd..053b1349d 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -38,6 +38,7 @@ use object_store::ObjectMeta; use once_cell::sync::OnceCell; use relative_path::RelativePath; use relative_path::RelativePathBuf; +use tokio::task::JoinSet; use tracing::info; use tracing::{error, warn}; use ulid::Ulid; @@ -798,79 +799,75 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(correlation_bytes) } - async fn upload_files_from_staging(&self) -> Result<(), ObjectStorageError> { + async fn upload_files_from_staging(&self, stream_name: &str) -> Result<(), ObjectStorageError> { if !PARSEABLE.options.staging_dir().exists() { return Ok(()); } + info!("Starting object_store_sync for stream- {stream_name}"); + + let stream = PARSEABLE.get_or_create_stream(stream_name); + let custom_partition = stream.get_custom_partition(); + for path in stream.parquet_files() { + let filename = path + .file_name() + .expect("only parquet files are returned by iterator") + .to_str() + .expect("filename is valid string"); + + let mut file_suffix = str::replacen(filename, ".", "/", 3); + + let custom_partition_clone = custom_partition.clone(); + if custom_partition_clone.is_some() { + let custom_partition_fields = custom_partition_clone.unwrap(); + let custom_partition_list = + custom_partition_fields.split(',').collect::>(); + file_suffix = str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); + } - // get all streams - for stream_name in PARSEABLE.streams.list() { - info!("Starting object_store_sync for stream- {stream_name}"); - - let stream = PARSEABLE.get_or_create_stream(&stream_name); - let custom_partition = stream.get_custom_partition(); - for path in stream.parquet_files() { - let filename = path - .file_name() - .expect("only parquet files are returned by iterator") - .to_str() - .expect("filename is valid string"); - - let mut file_date_part = filename.split('.').collect::>()[0]; - file_date_part = file_date_part.split('=').collect::>()[1]; - let compressed_size = path.metadata().map_or(0, |meta| meta.len()); - STORAGE_SIZE - .with_label_values(&["data", &stream_name, "parquet"]) - .add(compressed_size as i64); - EVENTS_STORAGE_SIZE_DATE - .with_label_values(&["data", &stream_name, "parquet", file_date_part]) - .add(compressed_size as i64); - LIFETIME_EVENTS_STORAGE_SIZE - .with_label_values(&["data", &stream_name, "parquet"]) - .add(compressed_size as i64); - let mut file_suffix = str::replacen(filename, ".", "/", 3); - - let custom_partition_clone = custom_partition.clone(); - if custom_partition_clone.is_some() { - let custom_partition_fields = custom_partition_clone.unwrap(); - let custom_partition_list = - custom_partition_fields.split(',').collect::>(); - file_suffix = - str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); - } - - let stream_relative_path = format!("{stream_name}/{file_suffix}"); - - // Try uploading the file, handle potential errors without breaking the loop - // if let Err(e) = self.upload_multipart(key, path) - if let Err(e) = self - .upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path) - .await - { - error!("Failed to upload file {filename:?}: {e}"); - continue; // Skip to the next file - } + let stream_relative_path = format!("{stream_name}/{file_suffix}"); - let absolute_path = self - .absolute_url(RelativePath::from_path(&stream_relative_path).unwrap()) - .to_string(); - let store = PARSEABLE.storage().get_object_store(); - let manifest = - catalog::create_from_parquet_file(absolute_path.clone(), &path).unwrap(); - catalog::update_snapshot(store, &stream_name, manifest).await?; + // Try uploading the file, handle potential errors without breaking the loop + // if let Err(e) = self.upload_multipart(key, path) + if let Err(e) = self + .upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path) + .await + { + error!("Failed to upload file {filename:?}: {e}"); + continue; // Skip to the next file + } + let mut file_date_part = filename.split('.').collect::>()[0]; + file_date_part = file_date_part.split('=').collect::>()[1]; + let compressed_size = path.metadata().map_or(0, |meta| meta.len()); + STORAGE_SIZE + .with_label_values(&["data", stream_name, "parquet"]) + .add(compressed_size as i64); + EVENTS_STORAGE_SIZE_DATE + .with_label_values(&["data", stream_name, "parquet", file_date_part]) + .add(compressed_size as i64); + LIFETIME_EVENTS_STORAGE_SIZE + .with_label_values(&["data", stream_name, "parquet"]) + .add(compressed_size as i64); + + let absolute_path = self + .absolute_url( + RelativePath::from_path(&stream_relative_path).expect("valid relative path"), + ) + .to_string(); + let store = PARSEABLE.storage().get_object_store(); + let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &path)?; + catalog::update_snapshot(store, stream_name, manifest).await?; - if let Err(e) = remove_file(path) { - warn!("Failed to remove staged file: {e}"); - } + if let Err(e) = remove_file(path) { + warn!("Failed to remove staged file: {e}"); } + } - for path in stream.schema_files() { - let file = File::open(&path)?; - let schema: Schema = serde_json::from_reader(file)?; - commit_schema_to_storage(&stream_name, schema).await?; - if let Err(e) = remove_file(path) { - warn!("Failed to remove staged file: {e}"); - } + for path in stream.schema_files() { + let file = File::open(&path)?; + let schema: Schema = serde_json::from_reader(file)?; + commit_schema_to_storage(stream_name, schema).await?; + if let Err(e) = remove_file(path) { + warn!("Failed to remove staged file: {e}"); } } @@ -878,6 +875,26 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } } +pub fn sync_all_streams(joinset: &mut JoinSet>) { + let object_store = PARSEABLE.storage.get_object_store(); + for stream_name in PARSEABLE.streams.list() { + let object_store = object_store.clone(); + joinset.spawn(async move { + let start = Instant::now(); + info!("Starting object_store_sync for stream- {stream_name}"); + let result = object_store.upload_files_from_staging(&stream_name).await; + if let Err(e) = result { + error!("Failed to upload files from staging for stream {stream_name}: {e}"); + } else { + info!( + "Completed object_store_sync for stream- {stream_name} in {} ms", + start.elapsed().as_millis() + ); + } + Ok(()) + }); + } +} pub async fn commit_schema_to_storage( stream_name: &str, schema: Schema, diff --git a/src/sync.rs b/src/sync.rs index 6fa568331..d2ab9c158 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -28,6 +28,7 @@ use tracing::{error, info, trace, warn}; use crate::alerts::{alerts_utils, AlertTask}; use crate::parseable::PARSEABLE; +use crate::storage::object_storage::sync_all_streams; use crate::{LOCAL_SYNC_INTERVAL, STORAGE_UPLOAD_INTERVAL}; // Calculates the instant that is the start of the next minute @@ -75,7 +76,7 @@ where /// Flushes arrows onto disk every `LOCAL_SYNC_INTERVAL` seconds, packs arrows into parquet every /// `STORAGE_CONVERSION_INTERVAL` secondsand uploads them every `STORAGE_UPLOAD_INTERVAL` seconds. -#[tokio::main(flavor = "multi_thread", worker_threads = 2)] +#[tokio::main(flavor = "multi_thread")] pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> { let (localsync_handler, mut localsync_outbox, localsync_inbox) = local_sync(); let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = @@ -119,39 +120,45 @@ pub fn object_store_sync() -> ( let (inbox_tx, inbox_rx) = oneshot::channel::<()>(); let handle = task::spawn(async move { + info!("Object store sync task started"); + let mut inbox_rx = inbox_rx; + let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { let mut sync_interval = interval_at(next_minute(), STORAGE_UPLOAD_INTERVAL); - - let mut inbox_rx = AssertUnwindSafe(inbox_rx); + let mut joinset = JoinSet::new(); loop { select! { _ = sync_interval.tick() => { trace!("Syncing Parquets to Object Store... "); - if let Err(e) = monitor_task_duration( - "object_store_sync", - Duration::from_secs(15), - || async { - PARSEABLE - .storage - .get_object_store() - .upload_files_from_staging().await - }, - ) - .await - { - warn!("failed to upload local data with object store. {e:?}"); + sync_all_streams(&mut joinset) + }, + Some(res) = joinset.join_next(), if !joinset.is_empty() => { + match res { + Ok(Ok(_)) => info!("Successfully uploaded files to object store."), + Ok(Err(err)) => warn!("Failed to upload files to object store. {err:?}"), + Err(err) => error!("Issue joining object store sync task: {err}"), } }, - res = &mut inbox_rx => {match res{ - Ok(_) => break, - Err(_) => { - warn!("Inbox channel closed unexpectedly"); - break; - }} + res = &mut inbox_rx => { + match res { + Ok(_) => break, + Err(_) => { + warn!("Inbox channel closed unexpectedly"); + break; + } + } } } } + // Drain remaining joinset tasks + while let Some(res) = joinset.join_next().await { + match res { + Ok(Ok(_)) => info!("Successfully uploaded files to object store."), + Ok(Err(err)) => warn!("Failed to upload files to object store. {err:?}"), + Err(err) => error!("Issue joining object store sync task: {err}"), + } + } })); match result { @@ -160,10 +167,10 @@ pub fn object_store_sync() -> ( } Err(panic_error) => { error!("Panic in object store sync task: {panic_error:?}"); - let _ = outbox_tx.send(()); } } + let _ = outbox_tx.send(()); info!("Object store sync task ended"); }); @@ -191,7 +198,7 @@ pub fn local_sync() -> ( select! { // Spawns a flush+conversion task every `LOCAL_SYNC_INTERVAL` seconds _ = sync_interval.tick() => { - PARSEABLE.streams.flush_and_convert(&mut joinset, false) + PARSEABLE.streams.flush_and_convert(&mut joinset, false, false) }, // Joins and logs errors in spawned tasks Some(res) = joinset.join_next(), if !joinset.is_empty() => { @@ -210,6 +217,15 @@ pub fn local_sync() -> ( } } } + + // Drain remaining joinset tasks + while let Some(res) = joinset.join_next().await { + match res { + Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."), + Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"), + Err(err) => error!("Issue joining flush+conversion task: {err}"), + } + } })); match result { @@ -228,6 +244,32 @@ pub fn local_sync() -> ( (handle, outbox_rx, inbox_tx) } +// local sync at the start of the server +pub async fn sync_start() -> anyhow::Result<()> { + let mut local_sync_joinset = JoinSet::new(); + PARSEABLE + .streams + .flush_and_convert(&mut local_sync_joinset, true, false); + while let Some(res) = local_sync_joinset.join_next().await { + match res { + Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."), + Ok(Err(err)) => return Err(err.into()), + Err(err) => error!("Failed to join async task: {err}"), + } + } + + let mut object_store_joinset = JoinSet::new(); + sync_all_streams(&mut object_store_joinset); + while let Some(res) = object_store_joinset.join_next().await { + match res { + Ok(Ok(_)) => info!("Successfully synced all data to S3."), + Ok(Err(err)) => return Err(err.into()), + Err(err) => error!("Failed to join async task: {err}"), + } + } + Ok(()) +} + /// A separate runtime for running all alert tasks #[tokio::main(flavor = "multi_thread")] pub async fn alert_runtime(mut rx: mpsc::Receiver) -> Result<(), anyhow::Error> { From bf532de334696ff7e2268c4b95f661c66ea2166a Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 15 Jun 2025 22:54:44 -0400 Subject: [PATCH 2/8] refactor --- src/parseable/staging/reader.rs | 28 ++++-- src/parseable/streams.rs | 172 ++++++++++++++++++++------------ src/storage/object_storage.rs | 4 +- src/sync.rs | 47 ++++----- 4 files changed, 144 insertions(+), 107 deletions(-) diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index d131ac0c8..777070e75 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -48,18 +48,26 @@ impl MergedRecordReader { for file in files { //remove empty files before reading - if file.metadata().unwrap().len() == 0 { - error!("Invalid file detected, removing it: {:?}", file); - remove_file(file).unwrap(); - } else { - let Ok(reader) = - StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None) - else { - error!("Invalid file detected, ignoring it: {:?}", file); + match file.metadata() { + Err(err) => { + error!("Error when trying to read file: {file:?}; error = {err}"); + continue; + } + Ok(metadata) if metadata.len() == 0 => { + error!("Empty file detected, removing it: {:?}", file); + remove_file(file).unwrap(); continue; - }; + } + Ok(_) => { + let Ok(reader) = + StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None) + else { + error!("Invalid file detected, ignoring it: {:?}", file); + continue; + }; - readers.push(reader); + readers.push(reader); + } } } diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 09997921f..dc8ed1a4b 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -67,12 +67,16 @@ use super::{ }; /// Returns the filename for parquet if provided arrows file path is valid as per our expectation -fn arrow_path_to_parquet(staging_path: &Path, path: &Path, random_string: &str) -> Option { +fn arrow_path_to_parquet( + stream_staging_path: &Path, + path: &Path, + random_string: &str, +) -> Option { let filename = path.file_stem()?.to_str()?; let (_, front) = filename.split_once('.')?; assert!(front.contains('.'), "contains the delim `.`"); let filename_with_random_number = format!("{front}.{random_string}.parquet"); - let mut parquet_path = staging_path.to_owned(); + let mut parquet_path = stream_staging_path.to_owned(); parquet_path.push(filename_with_random_number); Some(parquet_path) } @@ -345,9 +349,10 @@ impl Stream { arrow_files.retain(|path| { let creation = path .metadata() - .expect("Arrow file should exist on disk") - .created() - .expect("Creation time should be accessible"); + .ok() + .and_then(|meta| meta.created().or_else(|_| meta.modified()).ok()) + .expect("Arrow file should have a valid creation or modified time"); + // Compare if creation time is actually from previous minute minute_from_system_time(creation) < minute_from_system_time(exclude) }); @@ -594,7 +599,7 @@ impl Stream { .values() .map(|v| { v.iter() - .map(|file| file.metadata().unwrap().len()) + .filter_map(|file| file.metadata().ok().map(|meta| meta.len())) .sum::() }) .sum::(); @@ -624,92 +629,129 @@ impl Stream { return Ok(None); } - //find sum of arrow files in staging directory for a stream self.update_staging_metrics(&staging_files); - // warn!("staging files-\n{staging_files:?}\n"); for (parquet_path, arrow_files) in staging_files { let record_reader = MergedReverseRecordReader::try_new(&arrow_files); if record_reader.readers.is_empty() { continue; } let merged_schema = record_reader.merged_schema(); - let props = self.parquet_writer_props(&merged_schema, time_partition, custom_partition); schemas.push(merged_schema.clone()); let schema = Arc::new(merged_schema); - let mut part_path = parquet_path.to_owned(); - part_path.set_extension("part"); - let mut part_file = OpenOptions::new() - .create(true) - .append(true) - .open(&part_path) - .map_err(|_| StagingError::Create)?; - let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props))?; - for ref record in record_reader.merged_iter(schema, time_partition.cloned()) { - writer.write(record)?; - } - writer.close()?; - if part_file.metadata().expect("File was just created").len() - < parquet::file::FOOTER_SIZE as u64 - { - error!( - "Invalid parquet file {part_path:?} detected for stream {}, removing it", - &self.stream_name - ); - remove_file(part_path).expect("File should be removable if it is invalid"); + let part_path = parquet_path.with_extension("part"); + if !self.write_parquet_part_file( + &part_path, + record_reader, + &schema, + &props, + time_partition, + )? { continue; } - trace!("Parquet file successfully constructed"); - if let Err(e) = std::fs::rename(&part_path, &parquet_path) { + if let Err(e) = self.finalize_parquet_file(&part_path, &parquet_path) { error!("Couldn't rename part file: {part_path:?} -> {parquet_path:?}, error = {e}"); } else { - // delete the files that were grouped to create parquet file - for (i, file) in arrow_files.iter().enumerate() { - match file.metadata() { - Ok(meta) => { - let file_size = meta.len(); - match remove_file(file) { - Ok(_) => { - metrics::STORAGE_SIZE - .with_label_values(&[ - "staging", - &self.stream_name, - ARROW_FILE_EXTENSION, - ]) - .sub(file_size as i64); - } - Err(e) => { - warn!("Failed to delete file {}: {e}", file.display()); - } - } + self.cleanup_arrow_files_and_dir(&arrow_files); + } + } + + if schemas.is_empty() { + return Ok(None); + } + + Ok(Some(Schema::try_merge(schemas).unwrap())) + } + + fn write_parquet_part_file( + &self, + part_path: &Path, + record_reader: MergedReverseRecordReader, + schema: &Arc, + props: &WriterProperties, + time_partition: Option<&String>, + ) -> Result { + let mut part_file = OpenOptions::new() + .create(true) + .append(true) + .open(part_path) + .map_err(|_| StagingError::Create)?; + let mut writer = ArrowWriter::try_new(&mut part_file, schema.clone(), Some(props.clone()))?; + for ref record in record_reader.merged_iter(schema.clone(), time_partition.cloned()) { + writer.write(record)?; + } + writer.close()?; + + if part_file.metadata().expect("File was just created").len() + < parquet::file::FOOTER_SIZE as u64 + { + error!( + "Invalid parquet file {part_path:?} detected for stream {}, removing it", + &self.stream_name + ); + remove_file(part_path).expect("File should be removable if it is invalid"); + return Ok(false); + } + trace!("Parquet file successfully constructed"); + Ok(true) + } + + fn finalize_parquet_file(&self, part_path: &Path, parquet_path: &Path) -> std::io::Result<()> { + std::fs::rename(part_path, parquet_path) + } + + fn cleanup_arrow_files_and_dir(&self, arrow_files: &[PathBuf]) { + for (i, file) in arrow_files.iter().enumerate() { + match file.metadata() { + Ok(meta) => { + let file_size = meta.len(); + match remove_file(file) { + Ok(_) => { + metrics::STORAGE_SIZE + .with_label_values(&[ + "staging", + &self.stream_name, + ARROW_FILE_EXTENSION, + ]) + .sub(file_size as i64); } - Err(err) => { - warn!("File ({}) not found; Error = {err}", file.display()); + Err(e) => { + warn!("Failed to delete file {}: {e}", file.display()); } } + } + Err(err) => { + warn!("File ({}) not found; Error = {err}", file.display()); + } + } - // After deleting the last file, try to remove the inprocess directory - if i == arrow_files.len() - 1 { - if let Some(parent_dir) = file.parent() { - if let Err(err) = fs::remove_dir(parent_dir) { - warn!( - "Failed to remove inprocess directory {}: {err}", - parent_dir.display() - ); + // After deleting the last file, try to remove the inprocess directory if empty + if i == arrow_files.len() - 1 { + if let Some(parent_dir) = file.parent() { + match fs::read_dir(parent_dir) { + Ok(mut entries) => { + if entries.next().is_none() { + if let Err(err) = fs::remove_dir(parent_dir) { + warn!( + "Failed to remove inprocess directory {}: {err}", + parent_dir.display() + ); + } } } + Err(err) => { + warn!( + "Failed to read inprocess directory {}: {err}", + parent_dir.display() + ); + } } } } } - if schemas.is_empty() { - return Ok(None); - } - - Ok(Some(Schema::try_merge(schemas).unwrap())) } pub fn updated_schema(&self, current_schema: Schema) -> Schema { diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 053b1349d..00e262631 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -883,7 +883,7 @@ pub fn sync_all_streams(joinset: &mut JoinSet>) { let start = Instant::now(); info!("Starting object_store_sync for stream- {stream_name}"); let result = object_store.upload_files_from_staging(&stream_name).await; - if let Err(e) = result { + if let Err(ref e) = result { error!("Failed to upload files from staging for stream {stream_name}: {e}"); } else { info!( @@ -891,7 +891,7 @@ pub fn sync_all_streams(joinset: &mut JoinSet>) { start.elapsed().as_millis() ); } - Ok(()) + result }); } } diff --git a/src/sync.rs b/src/sync.rs index d2ab9c158..84af580b0 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -134,11 +134,7 @@ pub fn object_store_sync() -> ( sync_all_streams(&mut joinset) }, Some(res) = joinset.join_next(), if !joinset.is_empty() => { - match res { - Ok(Ok(_)) => info!("Successfully uploaded files to object store."), - Ok(Err(err)) => warn!("Failed to upload files to object store. {err:?}"), - Err(err) => error!("Issue joining object store sync task: {err}"), - } + log_join_result(res, "object store sync"); }, res = &mut inbox_rx => { match res { @@ -153,11 +149,7 @@ pub fn object_store_sync() -> ( } // Drain remaining joinset tasks while let Some(res) = joinset.join_next().await { - match res { - Ok(Ok(_)) => info!("Successfully uploaded files to object store."), - Ok(Err(err)) => warn!("Failed to upload files to object store. {err:?}"), - Err(err) => error!("Issue joining object store sync task: {err}"), - } + log_join_result(res, "object store sync"); } })); @@ -202,11 +194,7 @@ pub fn local_sync() -> ( }, // Joins and logs errors in spawned tasks Some(res) = joinset.join_next(), if !joinset.is_empty() => { - match res { - Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."), - Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"), - Err(err) => error!("Issue joining flush+conversion task: {err}"), - } + log_join_result(res, "flush and convert"); } res = &mut inbox_rx => {match res{ Ok(_) => break, @@ -220,11 +208,7 @@ pub fn local_sync() -> ( // Drain remaining joinset tasks while let Some(res) = joinset.join_next().await { - match res { - Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."), - Ok(Err(err)) => warn!("Failed to convert arrow files to parquet. {err:?}"), - Err(err) => error!("Issue joining flush+conversion task: {err}"), - } + log_join_result(res, "flush and convert"); } })); @@ -251,25 +235,28 @@ pub async fn sync_start() -> anyhow::Result<()> { .streams .flush_and_convert(&mut local_sync_joinset, true, false); while let Some(res) = local_sync_joinset.join_next().await { - match res { - Ok(Ok(_)) => info!("Successfully converted arrow files to parquet."), - Ok(Err(err)) => return Err(err.into()), - Err(err) => error!("Failed to join async task: {err}"), - } + log_join_result(res, "flush and convert"); } let mut object_store_joinset = JoinSet::new(); sync_all_streams(&mut object_store_joinset); while let Some(res) = object_store_joinset.join_next().await { - match res { - Ok(Ok(_)) => info!("Successfully synced all data to S3."), - Ok(Err(err)) => return Err(err.into()), - Err(err) => error!("Failed to join async task: {err}"), - } + log_join_result(res, "object store sync"); } Ok(()) } +fn log_join_result(res: Result, tokio::task::JoinError>, context: &str) +where + E: std::fmt::Debug, +{ + match res { + Ok(Ok(_)) => info!("Successfully completed {context}."), + Ok(Err(err)) => warn!("Failed to complete {context}. {err:?}"), + Err(err) => error!("Issue joining {context} task: {err}"), + } +} + /// A separate runtime for running all alert tasks #[tokio::main(flavor = "multi_thread")] pub async fn alert_runtime(mut rx: mpsc::Receiver) -> Result<(), anyhow::Error> { From a2f3ecebce0004bd1240cff1579e77d729784e71 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Sun, 15 Jun 2025 23:13:07 -0400 Subject: [PATCH 3/8] add monitoring of task in local and object store sync --- src/sync.rs | 106 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 63 insertions(+), 43 deletions(-) diff --git a/src/sync.rs b/src/sync.rs index 84af580b0..f3f266a35 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -84,7 +84,6 @@ pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> loop { select! { _ = &mut cancel_rx => { - // actix server finished .. stop other threads and stop the server remote_sync_inbox.send(()).unwrap_or(()); localsync_inbox.send(()).unwrap_or(()); if let Err(e) = localsync_handler.await { @@ -96,12 +95,9 @@ pub async fn handler(mut cancel_rx: oneshot::Receiver<()>) -> anyhow::Result<()> return Ok(()); }, _ = &mut localsync_outbox => { - // crash the server if localsync fails for any reason - // panic!("Local Sync thread died. Server will fail now!") return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) }, _ = &mut remote_sync_outbox => { - // remote_sync failed, this is recoverable by just starting remote_sync thread again if let Err(e) = remote_sync_handler.await { error!("Error joining remote_sync_handler: {e:?}"); } @@ -125,16 +121,26 @@ pub fn object_store_sync() -> ( let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { let mut sync_interval = interval_at(next_minute(), STORAGE_UPLOAD_INTERVAL); - let mut joinset = JoinSet::new(); loop { select! { _ = sync_interval.tick() => { trace!("Syncing Parquets to Object Store... "); - sync_all_streams(&mut joinset) - }, - Some(res) = joinset.join_next(), if !joinset.is_empty() => { - log_join_result(res, "object store sync"); + + // Monitor the duration of sync_all_streams execution + monitor_task_duration( + "object_store_sync_all_streams", + Duration::from_secs(15), + || async { + let mut joinset = JoinSet::new(); + sync_all_streams(&mut joinset); + + // Wait for all spawned tasks to complete + while let Some(res) = joinset.join_next().await { + log_join_result(res, "object store sync"); + } + } + ).await; }, res = &mut inbox_rx => { match res { @@ -147,10 +153,6 @@ pub fn object_store_sync() -> ( } } } - // Drain remaining joinset tasks - while let Some(res) = joinset.join_next().await { - log_join_result(res, "object store sync"); - } })); match result { @@ -184,32 +186,37 @@ pub fn local_sync() -> ( let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { let mut sync_interval = interval_at(next_minute(), LOCAL_SYNC_INTERVAL); - let mut joinset = JoinSet::new(); loop { select! { // Spawns a flush+conversion task every `LOCAL_SYNC_INTERVAL` seconds _ = sync_interval.tick() => { - PARSEABLE.streams.flush_and_convert(&mut joinset, false, false) + // Monitor the duration of flush_and_convert execution + monitor_task_duration( + "local_sync_flush_and_convert", + Duration::from_secs(15), + || async { + let mut joinset = JoinSet::new(); + PARSEABLE.streams.flush_and_convert(&mut joinset, false, false); + + // Wait for all spawned tasks to complete + while let Some(res) = joinset.join_next().await { + log_join_result(res, "flush and convert"); + } + } + ).await; }, - // Joins and logs errors in spawned tasks - Some(res) = joinset.join_next(), if !joinset.is_empty() => { - log_join_result(res, "flush and convert"); - } - res = &mut inbox_rx => {match res{ - Ok(_) => break, - Err(_) => { - warn!("Inbox channel closed unexpectedly"); - break; - }} + res = &mut inbox_rx => { + match res { + Ok(_) => break, + Err(_) => { + warn!("Inbox channel closed unexpectedly"); + break; + } + } } } } - - // Drain remaining joinset tasks - while let Some(res) = joinset.join_next().await { - log_join_result(res, "flush and convert"); - } })); match result { @@ -228,21 +235,34 @@ pub fn local_sync() -> ( (handle, outbox_rx, inbox_tx) } -// local sync at the start of the server +// local and object store sync at the start of the server pub async fn sync_start() -> anyhow::Result<()> { - let mut local_sync_joinset = JoinSet::new(); - PARSEABLE - .streams - .flush_and_convert(&mut local_sync_joinset, true, false); - while let Some(res) = local_sync_joinset.join_next().await { - log_join_result(res, "flush and convert"); - } + // Monitor local sync duration at startup + monitor_task_duration("startup_local_sync", Duration::from_secs(15), || async { + let mut local_sync_joinset = JoinSet::new(); + PARSEABLE + .streams + .flush_and_convert(&mut local_sync_joinset, true, false); + while let Some(res) = local_sync_joinset.join_next().await { + log_join_result(res, "flush and convert"); + } + }) + .await; + + // Monitor object store sync duration at startup + monitor_task_duration( + "startup_object_store_sync", + Duration::from_secs(15), + || async { + let mut object_store_joinset = JoinSet::new(); + sync_all_streams(&mut object_store_joinset); + while let Some(res) = object_store_joinset.join_next().await { + log_join_result(res, "object store sync"); + } + }, + ) + .await; - let mut object_store_joinset = JoinSet::new(); - sync_all_streams(&mut object_store_joinset); - while let Some(res) = object_store_joinset.join_next().await { - log_join_result(res, "object store sync"); - } Ok(()) } From 28e02a5396345ebb73e0c09b7ee5048df63f3270 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 16 Jun 2025 00:57:13 -0400 Subject: [PATCH 4/8] handle panic under async --- src/sync.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/sync.rs b/src/sync.rs index f3f266a35..3f6fd5922 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -17,6 +17,7 @@ */ use chrono::{TimeDelta, Timelike}; +use futures::FutureExt; use std::collections::HashMap; use std::future::Future; use std::panic::AssertUnwindSafe; @@ -119,7 +120,7 @@ pub fn object_store_sync() -> ( info!("Object store sync task started"); let mut inbox_rx = inbox_rx; - let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { + let result = tokio::spawn(async move { let mut sync_interval = interval_at(next_minute(), STORAGE_UPLOAD_INTERVAL); loop { @@ -153,11 +154,13 @@ pub fn object_store_sync() -> ( } } } - })); + }); - match result { - Ok(future) => { - future.await; + match AssertUnwindSafe(result).catch_unwind().await { + Ok(join_result) => { + if let Err(join_err) = join_result { + error!("Panic in object store sync task: {join_err:?}"); + } } Err(panic_error) => { error!("Panic in object store sync task: {panic_error:?}"); @@ -184,12 +187,11 @@ pub fn local_sync() -> ( info!("Local sync task started"); let mut inbox_rx = inbox_rx; - let result = std::panic::catch_unwind(AssertUnwindSafe(|| async move { + let result = tokio::spawn(async move { let mut sync_interval = interval_at(next_minute(), LOCAL_SYNC_INTERVAL); loop { select! { - // Spawns a flush+conversion task every `LOCAL_SYNC_INTERVAL` seconds _ = sync_interval.tick() => { // Monitor the duration of flush_and_convert execution monitor_task_duration( @@ -217,11 +219,13 @@ pub fn local_sync() -> ( } } } - })); + }); - match result { - Ok(future) => { - future.await; + match AssertUnwindSafe(result).catch_unwind().await { + Ok(join_result) => { + if let Err(join_err) = join_result { + error!("Panic in local sync task: {join_err:?}"); + } } Err(panic_error) => { error!("Panic in local sync task: {panic_error:?}"); From 370f563b5672e06489b70e320b48ced6601da0a6 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 16 Jun 2025 01:22:29 -0400 Subject: [PATCH 5/8] refactor and fmt --- src/otel/traces.rs | 155 +++++++++++++++++++++++++++++---------- src/parseable/streams.rs | 5 +- 2 files changed, 119 insertions(+), 41 deletions(-) diff --git a/src/otel/traces.rs b/src/otel/traces.rs index d62efdab3..12c6e350f 100644 --- a/src/otel/traces.rs +++ b/src/otel/traces.rs @@ -336,7 +336,6 @@ fn flatten_span_record(span_record: &Span) -> Vec> { span_records_json } - #[cfg(test)] mod tests { use super::*; @@ -360,13 +359,21 @@ mod tests { KeyValue { key: "service.name".to_string(), value: Some(AnyValue { - value: Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue("test-service".to_string())), + value: Some( + opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue( + "test-service".to_string(), + ), + ), }), }, KeyValue { key: "http.method".to_string(), value: Some(AnyValue { - value: Some(opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue("GET".to_string())), + value: Some( + opentelemetry_proto::tonic::common::v1::any_value::Value::StringValue( + "GET".to_string(), + ), + ), }), }, ] @@ -398,7 +405,8 @@ mod tests { assert_eq!( result.get("span_status_description").unwrap(), &Value::String(expected_description.to_string()), - "Status description should match expected value for code {}", code + "Status description should match expected value for code {}", + code ); assert_eq!( result.get("span_status_message").unwrap(), @@ -432,7 +440,8 @@ mod tests { assert_eq!( result.get("span_kind_description").unwrap(), &Value::String(expected_description.to_string()), - "Span kind description should match expected value for kind {}", kind + "Span kind description should match expected value for kind {}", + kind ); } } @@ -459,7 +468,8 @@ mod tests { assert_eq!( result.get("span_flags_description").unwrap(), &Value::String(expected_description.to_string()), - "Span flags description should match expected value for flags {}", flags + "Span flags description should match expected value for flags {}", + flags ); } } @@ -488,7 +498,10 @@ mod tests { // Check first event let first_event = &result[0]; - assert!(first_event.contains_key("event_time_unix_nano"), "Should contain timestamp"); + assert!( + first_event.contains_key("event_time_unix_nano"), + "Should contain timestamp" + ); assert_eq!( first_event.get("event_name").unwrap(), &Value::String("request.start".to_string()), @@ -499,7 +512,10 @@ mod tests { &Value::Number(2.into()), "Dropped attributes count should be preserved" ); - assert!(first_event.contains_key("service.name"), "Should contain flattened attributes"); + assert!( + first_event.contains_key("service.name"), + "Should contain flattened attributes" + ); // Check second event let second_event = &result[1]; @@ -518,16 +534,14 @@ mod tests { #[test] fn test_flatten_links_structure() { // Test that links are properly flattened with all expected fields - let links = vec![ - Link { - trace_id: sample_trace_id(), - span_id: sample_span_id(), - trace_state: "state1".to_string(), - attributes: sample_attributes(), - dropped_attributes_count: 1, - flags: 0, - }, - ]; + let links = vec![Link { + trace_id: sample_trace_id(), + span_id: sample_span_id(), + trace_state: "state1".to_string(), + attributes: sample_attributes(), + dropped_attributes_count: 1, + flags: 0, + }]; let result = flatten_links(&links); @@ -549,7 +563,10 @@ mod tests { &Value::Number(1.into()), "Dropped attributes count should be preserved" ); - assert!(link.contains_key("service.name"), "Should contain flattened attributes"); + assert!( + link.contains_key("service.name"), + "Should contain flattened attributes" + ); } #[test] @@ -611,12 +628,30 @@ mod tests { &Value::String("SPAN_KIND_SERVER".to_string()), "All records should contain span kind description" ); - assert!(record.contains_key("span_trace_id"), "Should contain trace ID"); - assert!(record.contains_key("span_span_id"), "Should contain span ID"); - assert!(record.contains_key("span_start_time_unix_nano"), "Should contain start time"); - assert!(record.contains_key("span_end_time_unix_nano"), "Should contain end time"); - assert!(record.contains_key("service.name"), "Should contain span attributes"); - assert!(record.contains_key("span_status_code"), "Should contain status"); + assert!( + record.contains_key("span_trace_id"), + "Should contain trace ID" + ); + assert!( + record.contains_key("span_span_id"), + "Should contain span ID" + ); + assert!( + record.contains_key("span_start_time_unix_nano"), + "Should contain start time" + ); + assert!( + record.contains_key("span_end_time_unix_nano"), + "Should contain end time" + ); + assert!( + record.contains_key("service.name"), + "Should contain span attributes" + ); + assert!( + record.contains_key("span_status_code"), + "Should contain status" + ); } // One record should be an event, one should be a link @@ -650,7 +685,11 @@ mod tests { let result = flatten_span_record(&span); - assert_eq!(result.len(), 1, "Should have exactly one record for span without events/links"); + assert_eq!( + result.len(), + 1, + "Should have exactly one record for span without events/links" + ); let record = &result[0]; assert_eq!( @@ -658,9 +697,18 @@ mod tests { &Value::String("simple-span".to_string()), "Should contain span name" ); - assert!(!record.contains_key("event_name"), "Should not contain event fields"); - assert!(!record.contains_key("link_trace_id"), "Should not contain link fields"); - assert!(!record.contains_key("span_status_code"), "Should not contain status when none provided"); + assert!( + !record.contains_key("event_name"), + "Should not contain event fields" + ); + assert!( + !record.contains_key("link_trace_id"), + "Should not contain link fields" + ); + assert!( + !record.contains_key("span_status_code"), + "Should not contain status when none provided" + ); } #[test] @@ -705,10 +753,16 @@ mod tests { assert_eq!(hex_span_id, "12345678", "Span ID should be lowercase hex"); } if let Some(Value::String(hex_parent_span_id)) = record.get("span_parent_span_id") { - assert_eq!(hex_parent_span_id, "87654321", "Parent span ID should be lowercase hex"); + assert_eq!( + hex_parent_span_id, "87654321", + "Parent span ID should be lowercase hex" + ); } if let Some(Value::String(link_trace_id)) = record.get("link_trace_id") { - assert_eq!(link_trace_id, "ffabcdef", "Link trace ID should be lowercase hex"); + assert_eq!( + link_trace_id, "ffabcdef", + "Link trace ID should be lowercase hex" + ); } } } @@ -823,15 +877,36 @@ mod tests { fn test_known_field_list_completeness() { // Test that the OTEL_TRACES_KNOWN_FIELD_LIST contains all expected fields let expected_fields = [ - "scope_name", "scope_version", "scope_schema_url", "scope_dropped_attributes_count", - "resource_schema_url", "resource_dropped_attributes_count", - "span_trace_id", "span_span_id", "span_name", "span_parent_span_id", "name", - "span_kind", "span_kind_description", "span_start_time_unix_nano", "span_end_time_unix_nano", - "event_name", "event_time_unix_nano", "event_dropped_attributes_count", - "link_span_id", "link_trace_id", "link_dropped_attributes_count", - "span_dropped_events_count", "span_dropped_links_count", "span_dropped_attributes_count", - "span_trace_state", "span_flags", "span_flags_description", - "span_status_code", "span_status_description", "span_status_message", + "scope_name", + "scope_version", + "scope_schema_url", + "scope_dropped_attributes_count", + "resource_schema_url", + "resource_dropped_attributes_count", + "span_trace_id", + "span_span_id", + "span_name", + "span_parent_span_id", + "name", + "span_kind", + "span_kind_description", + "span_start_time_unix_nano", + "span_end_time_unix_nano", + "event_name", + "event_time_unix_nano", + "event_dropped_attributes_count", + "link_span_id", + "link_trace_id", + "link_dropped_attributes_count", + "span_dropped_events_count", + "span_dropped_links_count", + "span_dropped_attributes_count", + "span_trace_state", + "span_flags", + "span_flags_description", + "span_status_code", + "span_status_description", + "span_status_message", ]; assert_eq!( diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index dc8ed1a4b..06e00ddaa 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -74,7 +74,10 @@ fn arrow_path_to_parquet( ) -> Option { let filename = path.file_stem()?.to_str()?; let (_, front) = filename.split_once('.')?; - assert!(front.contains('.'), "contains the delim `.`"); + if !front.contains('.') { + warn!("Skipping unexpected arrow file without `.`: {}", filename); + return None; + } let filename_with_random_number = format!("{front}.{random_string}.parquet"); let mut parquet_path = stream_staging_path.to_owned(); parquet_path.push(filename_with_random_number); From 70ca26db12636f174abbbd69362779deb1de6d00 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 16 Jun 2025 05:41:44 -0400 Subject: [PATCH 6/8] rename folder to processing --- src/parseable/streams.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 06e00ddaa..2ed089145 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -66,6 +66,8 @@ use super::{ LogStream, ARROW_FILE_EXTENSION, }; +const INPROCESS_DIR_PREFIX: &str = "processing_"; + /// Returns the filename for parquet if provided arrows file path is valid as per our expectation fn arrow_path_to_parquet( stream_staging_path: &Path, @@ -228,7 +230,12 @@ impl Stream { //iterate through all the inprocess_ directories and collect all arrow files dir.filter_map(|entry| { let path = entry.ok()?.path(); - if path.is_dir() && path.file_name()?.to_str()?.starts_with("inprocess_") { + if path.is_dir() + && path + .file_name()? + .to_str()? + .starts_with(INPROCESS_DIR_PREFIX) + { Some(path) } else { None @@ -399,7 +406,7 @@ impl Stream { } fn inprocess_folder(base: &Path, minute: u128) -> PathBuf { - base.join(format!("inprocess_{}", minute)) + base.join(format!("{INPROCESS_DIR_PREFIX}{minute}")) } pub fn parquet_files(&self) -> Vec { From 1d92e04e2508651456cb15f6ef9d397bc364e5aa Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 16 Jun 2025 05:51:10 -0400 Subject: [PATCH 7/8] init sync for ingest server --- src/handlers/http/modal/ingest_server.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index e9751ee13..351cad896 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -31,6 +31,7 @@ use tokio::sync::oneshot; use tokio::sync::OnceCell; use crate::handlers::http::modal::NodeType; +use crate::sync::sync_start; use crate::{ analytics, handlers::{ @@ -114,6 +115,13 @@ impl ParseableServer for IngestServer { migration::run_migration(&PARSEABLE).await?; + // local sync on init + tokio::spawn(async { + if let Err(e) = sync_start().await { + tracing::warn!("local sync on server start failed: {e}"); + } + }); + // Run sync on a background thread let (cancel_tx, cancel_rx) = oneshot::channel(); thread::spawn(|| sync::handler(cancel_rx)); From f020c6b89a5dbb5c79523e09e0702e6eafa163e5 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 16 Jun 2025 06:22:49 -0400 Subject: [PATCH 8/8] ensure init sync completion --- src/handlers/http/modal/ingest_server.rs | 6 ++++-- src/handlers/http/modal/query_server.rs | 6 ++++-- src/handlers/http/modal/server.rs | 6 ++++-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 351cad896..8a8e1d1b1 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -116,7 +116,7 @@ impl ParseableServer for IngestServer { migration::run_migration(&PARSEABLE).await?; // local sync on init - tokio::spawn(async { + let startup_sync_handle = tokio::spawn(async { if let Err(e) = sync_start().await { tracing::warn!("local sync on server start failed: {e}"); } @@ -132,7 +132,9 @@ impl ParseableServer for IngestServer { let result = self.start(shutdown_rx, prometheus.clone(), None).await; // Cancel sync jobs cancel_tx.send(()).expect("Cancellation should not fail"); - + if let Err(join_err) = startup_sync_handle.await { + tracing::warn!("startup sync task panicked: {join_err}"); + } result } } diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index e8a256925..b43fa68a9 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -129,7 +129,7 @@ impl ParseableServer for QueryServer { } // local sync on init - tokio::spawn(async { + let startup_sync_handle = tokio::spawn(async { if let Err(e) = sync_start().await { tracing::warn!("local sync on server start failed: {e}"); } @@ -150,7 +150,9 @@ impl ParseableServer for QueryServer { .await?; // Cancel sync jobs cancel_tx.send(()).expect("Cancellation should not fail"); - + if let Err(join_err) = startup_sync_handle.await { + tracing::warn!("startup sync task panicked: {join_err}"); + } Ok(result) } } diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 64b70b46f..d22e5de02 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -124,7 +124,7 @@ impl ParseableServer for Server { storage::retention::load_retention_from_global(); // local sync on init - tokio::spawn(async { + let startup_sync_handle = tokio::spawn(async { if let Err(e) = sync_start().await { tracing::warn!("local sync on server start failed: {e}"); } @@ -150,7 +150,9 @@ impl ParseableServer for Server { .await; // Cancel sync jobs cancel_tx.send(()).expect("Cancellation should not fail"); - + if let Err(join_err) = startup_sync_handle.await { + tracing::warn!("startup sync task panicked: {join_err}"); + } return result; } }