From 1af17bc4a6fe23586c422f93e84111a07c51304b Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 8 Sep 2022 13:39:20 +0530 Subject: [PATCH] Append arrow directly to local data Currently any event that arrives is directly written to data.parquet. It is easier to do it this way but this ultimatly slows down log ingestion. To make log ingestion faster a stream of record-batches would act as an ideal buffer. Appending arrow record-batches directly to a stream is faster way to go about it but it has it's own challenges. - querying realtime would require having these record-batches in memory. - directly appending record-batches from multiple threads to an append only file is not supported by Arrow StreamWriter. Locking is used for now - Re-implementation of stats is required - Cannot fully guarantee state of localstorage if it failed exactly at move step of localsync. orphan tmp files can occur although that'd very rare --- server/src/event.rs | 212 ++++++++++++++++++++----------- server/src/handlers/logstream.rs | 23 ++-- server/src/main.rs | 15 ++- server/src/storage.rs | 125 ++++++++++++++---- 4 files changed, 257 insertions(+), 118 deletions(-) diff --git a/server/src/event.rs b/server/src/event.rs index 4bcb8beda..80acdf050 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -18,23 +18,143 @@ */ use datafusion::arrow; use datafusion::arrow::datatypes::Schema; +use datafusion::arrow::ipc::writer::StreamWriter; use datafusion::arrow::json; use datafusion::arrow::json::reader::infer_json_schema; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::parquet::arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader}; -use datafusion::parquet::file::properties::WriterProperties; -use datafusion::parquet::file::serialized_reader::SerializedFileReader; +use lazy_static::lazy_static; use log::error; -use std::fs; +use std::collections::HashMap; +use std::fs::OpenOptions; use std::io::BufReader; use std::sync::Arc; +use std::sync::Mutex; +use std::sync::RwLock; use crate::metadata; +use crate::metadata::STREAM_INFO; use crate::option::CONFIG; use crate::response; use crate::storage::ObjectStorage; use crate::Error; +type LocalWriter = Mutex>>; + +lazy_static! { + #[derive(Default)] + pub static ref STREAM_WRITERS: RwLock> = RwLock::new(HashMap::new()); +} + +impl STREAM_WRITERS { + // append to a existing stream + fn append_to_local(stream: &str, record: &RecordBatch) -> Result<(), ()> { + let hashmap_guard = STREAM_WRITERS.read().unwrap(); + match hashmap_guard.get(stream) { + Some(localwriter) => { + let mut writer_guard = localwriter.lock().unwrap(); + if let Some(ref mut writer) = *writer_guard { + writer.write(record).map_err(|_| ())?; + } else { + drop(writer_guard); + drop(hashmap_guard); + STREAM_WRITERS::set_entry(stream, record).unwrap(); + } + } + None => { + drop(hashmap_guard); + STREAM_WRITERS::create_entry(stream.to_string(), record).unwrap(); + } + }; + Ok(()) + } + + // create a new entry with new stream_writer + // todo: error type + // Only create entry for valid streams + fn create_entry(stream: String, record: &RecordBatch) -> Result<(), ()> { + let mut hashmap_guard = STREAM_WRITERS.write().unwrap(); + + if STREAM_INFO.schema(&stream).is_err() { + return Err(()); + } + + let file = OpenOptions::new() + .append(true) + .create_new(true) + .open(data_file_path(&stream)) + .map_err(|_| ())?; + + let mut stream_writer = StreamWriter::try_new(file, &record.schema()).map_err(|_| ())?; + stream_writer.write(record).map_err(|_| ())?; + + hashmap_guard.insert(stream, Mutex::new(Some(stream_writer))); + + Ok(()) + } + + // Deleting a logstream requires that metadata is deleted first + pub fn delete_entry(stream: &str) -> Result<(), ()> { + let mut hashmap_guard = STREAM_WRITERS.write().unwrap(); + + if STREAM_INFO.schema(stream).is_ok() { + return Err(()); + } + + hashmap_guard.remove(stream); + + Ok(()) + } + + fn set_entry(stream: &str, record: &RecordBatch) -> Result<(), ()> { + let file = OpenOptions::new() + .append(true) + .create_new(true) + .open(data_file_path(stream)) + .map_err(|_| ())?; + + let mut stream_writer = StreamWriter::try_new(file, &record.schema()).map_err(|_| ())?; + stream_writer.write(record).map_err(|_| ())?; + + STREAM_WRITERS + .read() + .expect("Current Thread should not hold any lock") + .get(stream) + .expect("set entry is only called on valid entries") + .lock() + .expect("Poisioning is not handled yet") + .replace(stream_writer); // replace the stream writer behind this mutex + + Ok(()) + } + + // Unset the entry so that + pub fn unset_entry(stream: &str) { + let guard = STREAM_WRITERS.read().unwrap(); + let stream_writer = match guard.get(stream) { + Some(writer) => writer, + None => return, + }; + stream_writer + .lock() + .expect("Poisioning is not handled yet") + .take(); + } +} + +#[derive(Debug, thiserror::Error)] +enum StreamWriterError {} + +fn data_file_path(stream_name: &str) -> String { + format!( + "{}/{}", + CONFIG + .parseable + .local_stream_data_path(stream_name) + .to_string_lossy(), + "data.records" + ) +} + #[derive(Clone)] pub struct Event { pub body: String, @@ -44,17 +164,6 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { - fn data_file_path(&self) -> String { - format!( - "{}/{}", - CONFIG - .parseable - .local_stream_data_path(&self.stream_name) - .to_string_lossy(), - "data.parquet" - ) - } - pub async fn process( &self, storage: &impl ObjectStorage, @@ -65,12 +174,11 @@ impl Event { })?; let event = self.get_reader(inferred_schema.clone()); - let size = self.body_size(); let stream_schema = metadata::STREAM_INFO.schema(&self.stream_name)?; let is_first_event = stream_schema.is_none(); - let compressed_size = if let Some(existing_schema) = stream_schema { + if let Some(existing_schema) = stream_schema { // validate schema before processing the event if existing_schema != inferred_schema { return Err(Error::SchemaMismatch(self.stream_name.clone())); @@ -84,11 +192,6 @@ impl Event { .await? }; - if let Err(e) = metadata::STREAM_INFO.update_stats(&self.stream_name, size, compressed_size) - { - error!("Couldn't update stream stats. {:?}", e); - } - if let Err(e) = metadata::STREAM_INFO.check_alerts(self).await { error!("Error checking for alerts. {:?}", e); } @@ -115,59 +218,44 @@ impl Event { storage: &impl ObjectStorage, ) -> Result { let rb = event.next()?.ok_or(Error::MissingRecord)?; + let stream_name = &self.stream_name; - // Store record batch to Parquet file on local cache - let compressed_size = self.convert_arrow_parquet(rb)?; + // Store record batch on local cache + STREAM_WRITERS::create_entry(stream_name.clone(), &rb).unwrap(); // Put the inferred schema to object store - let stream_name = &self.stream_name; - storage .put_schema(stream_name.clone(), &schema) .await .map_err(|e| response::EventError { msg: format!( "Failed to upload schema for log stream {} due to err: {}", - self.stream_name, e + stream_name, e ), })?; // set the schema in memory for this stream metadata::STREAM_INFO - .set_schema(&self.stream_name, schema) + .set_schema(stream_name, schema) .map_err(|e| response::EventError { msg: format!( "Failed to set schema for log stream {} due to err: {}", - &self.stream_name, e + stream_name, e ), })?; - Ok(compressed_size) + Ok(0) } // event process all events after the 1st event. Concatenates record batches // and puts them in memory store for each event. fn process_event(&self, mut event: json::Reader) -> Result { - let next_event_rb = event.next()?.ok_or(Error::MissingRecord)?; - - let compressed_size = match self.convert_parquet_rb_reader() { - Ok(mut arrow_reader) => { - let mut total_size = 0; - let rb = arrow_reader.get_record_reader(2048).unwrap(); - for prev_rb in rb { - let new_rb = RecordBatch::concat( - &std::sync::Arc::new(arrow_reader.get_schema().unwrap()), - &[next_event_rb.clone(), prev_rb.unwrap()], - )?; - total_size += self.convert_arrow_parquet(new_rb)?; - } + let rb = event.next()?.ok_or(Error::MissingRecord)?; + let stream_name = &self.stream_name; - total_size - } - Err(_) => self.convert_arrow_parquet(next_event_rb)?, - }; + STREAM_WRITERS::append_to_local(stream_name, &rb).map_err(|_| Error::MissingRecord)?; - Ok(compressed_size) + Ok(0) } // inferSchema is a constructor to Schema @@ -187,32 +275,4 @@ impl Event { json::reader::DecoderOptions::new().with_batch_size(1024), ) } - - fn body_size(&self) -> u64 { - self.body.as_bytes().len() as u64 - } - - // convert arrow record batch to parquet - // and write it to local cache path as a data.parquet file. - fn convert_arrow_parquet(&self, rb: RecordBatch) -> Result { - let parquet_path = self.data_file_path(); - let parquet_file = fs::File::create(&parquet_path)?; - let props = WriterProperties::builder().build(); - let mut writer = - ArrowWriter::try_new(parquet_file, Arc::new(self.infer_schema()?), Some(props))?; - writer.write(&rb)?; - writer.close()?; - - let compressed_size = fs::metadata(parquet_path)?.len(); - - Ok(compressed_size) - } - - pub fn convert_parquet_rb_reader(&self) -> Result { - let file = fs::File::open(&self.data_file_path())?; - let file_reader = SerializedFileReader::new(file)?; - let arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); - - Ok(arrow_reader) - } } diff --git a/server/src/handlers/logstream.rs b/server/src/handlers/logstream.rs index 288e979c7..ff0b59af0 100644 --- a/server/src/handlers/logstream.rs +++ b/server/src/handlers/logstream.rs @@ -22,9 +22,9 @@ use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, HttpResponse, Responder}; use crate::alerts::Alerts; -use crate::response; use crate::s3::S3; use crate::storage::{ObjectStorage, StorageDir}; +use crate::{event, response}; use crate::{metadata, validator}; pub async fn delete(req: HttpRequest) -> HttpResponse { @@ -59,20 +59,27 @@ pub async fn delete(req: HttpRequest) -> HttpResponse { .to_http(); } - let stream_dir = StorageDir::new(&stream_name); - if fs::remove_dir_all(&stream_dir.data_path).is_err() { + if let Err(e) = metadata::STREAM_INFO.delete_stream(&stream_name) { log::warn!( - "failed to delete local data for stream {}. Clean {} manually", + "failed to delete log stream {} from metadata due to err: {}", stream_name, - stream_dir.data_path.to_string_lossy() + e ) } - if let Err(e) = metadata::STREAM_INFO.delete_stream(&stream_name) { + if event::STREAM_WRITERS::delete_entry(&stream_name).is_err() { log::warn!( - "failed to delete log stream {} from metadata due to err: {}", + "failed to delete log stream event writers for stream {}", + stream_name + ) + } + + let stream_dir = StorageDir::new(stream_name.clone()); + if fs::remove_dir_all(&stream_dir.data_path).is_err() { + log::warn!( + "failed to delete local data for stream {}. Clean {} manually", stream_name, - e + stream_dir.data_path.to_string_lossy() ) } diff --git a/server/src/main.rs b/server/src/main.rs index 11a16775f..92faffa21 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -73,7 +73,7 @@ async fn main() -> anyhow::Result<()> { warn!("could not populate local metadata. {:?}", e); } - // Move all exiting data.parquet file to their respective tmp directory + // Move all exiting data.records file to their respective tmp directory // they will be synced to object store on next s3 sync cycle startup_sync(); @@ -112,11 +112,12 @@ fn startup_sync() { } for stream in metadata::STREAM_INFO.list_streams() { - let dir = StorageDir::new(&stream); - // if data.parquet file is not present then skip this stream - if !dir.parquet_path_exists() { + let dir = StorageDir::new(stream.clone()); + // if data.records file is not present then skip this stream + if !dir.local_data_exists() { continue; } + if let Err(e) = dir.create_temp_dir() { log::error!( "Error creating tmp directory for {} due to error [{}]", @@ -126,7 +127,7 @@ fn startup_sync() { continue; } // create prefix for this file from its last modified time - let path = dir.data_path.join("data.parquet"); + let path = dir.data_path.join("data.records"); // metadata.modified gives us system time // This may not work on all platfomns @@ -156,7 +157,7 @@ fn startup_sync() { let local_uri = str::replace(&uri, "/", "."); let hostname = utils::hostname_unchecked(); let parquet_file_local = format!("{}{}.data.parquet", local_uri, hostname); - if let Err(err) = dir.move_parquet_to_temp(parquet_file_local) { + if let Err(err) = dir.move_local_to_temp(parquet_file_local) { log::warn!( "Failed to move parquet file at {} to tmp directory due to error {}", path.display(), @@ -269,7 +270,7 @@ async fn run_http() -> anyhow::Result<()> { (_, _) => None, }; - let http_server = HttpServer::new(move || create_app!()); + let http_server = HttpServer::new(move || create_app!()).workers(num_cpus::get() - 1); if let Some(builder) = ssl_acceptor { http_server .bind_openssl(&CONFIG.parseable.address, builder)? diff --git a/server/src/storage.rs b/server/src/storage.rs index 153ac3417..103fb8e4a 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -20,16 +20,21 @@ use crate::alerts::Alerts; use crate::metadata::{Stats, STREAM_INFO}; use crate::option::CONFIG; use crate::query::Query; -use crate::utils; +use crate::{event, utils}; use async_trait::async_trait; use chrono::{Duration, Timelike, Utc}; use datafusion::arrow::datatypes::Schema; +use datafusion::arrow::error::ArrowError; +use datafusion::arrow::ipc::reader::StreamReader; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::parquet::arrow::ArrowWriter; +use datafusion::parquet::errors::ParquetError; +use datafusion::parquet::file::properties::WriterProperties; use serde::Serialize; use std::fmt::Debug; -use std::fs; +use std::fs::{self, File}; use std::io; use std::iter::Iterator; use std::path::{Path, PathBuf}; @@ -37,7 +42,7 @@ use std::path::{Path, PathBuf}; extern crate walkdir; use walkdir::WalkDir; -/// local sync interval to move data.parquet to /tmp dir of that stream. +/// local sync interval to move data.records to /tmp dir of that stream. /// 60 sec is a reasonable value. pub const LOCAL_SYNC_INTERVAL: u64 = 60; @@ -79,10 +84,10 @@ pub trait ObjectStorage: Sync + 'static { // entries here means all the streams present on local disk for stream in streams { - let sync = StorageSync::new(&stream); + let sync = StorageSync::new(stream.clone()); - // if data.parquet file not present, skip this stream - if !sync.dir.parquet_path_exists() { + // if data.records file not present, skip this stream + if !sync.dir.local_data_exists() { continue; } @@ -95,14 +100,21 @@ pub trait ObjectStorage: Sync + 'static { continue; } - if let Err(e) = sync.move_parquet_to_temp() { - log::error!( - "Error copying parquet from stream directory in [{}] to tmp directory [{}] due to error [{}]", - sync.dir.data_path.to_string_lossy(), - sync.dir.temp_dir.to_string_lossy(), - e - ); - continue; + match sync.move_local_to_temp() { + Ok(parquet_size) => { + if let Err(e) = STREAM_INFO.update_stats(&stream, 0, parquet_size) { + log::error!("Couldn't update stream stats. {:?}", e); + } + } + Err(e) => { + log::error!( + "Error copying parquet from stream directory in [{}] to tmp directory [{}] due to error [{}]", + sync.dir.data_path.to_string_lossy(), + sync.dir.temp_dir.to_string_lossy(), + e + ); + continue; + } } } @@ -117,7 +129,7 @@ pub trait ObjectStorage: Sync + 'static { let streams = STREAM_INFO.list_streams(); for stream in streams { - let dir = StorageDir::new(&stream); + let dir = StorageDir::new(stream.clone()); for file in WalkDir::new(dir.temp_dir) .min_depth(1) @@ -126,6 +138,14 @@ pub trait ObjectStorage: Sync + 'static { .filter_map(|file| file.ok()) .map(|file| file.path().to_path_buf()) .filter(|file| file.is_file()) + .filter(|file| { + let is_tmp = match file.extension() { + Some(ext) => ext.eq_ignore_ascii_case("tmp"), + None => false, + }; + + !is_tmp + }) { let filename = file.file_name().unwrap().to_str().unwrap(); let file_suffix = str::replacen(filename, ".", "/", 3); @@ -152,50 +172,101 @@ pub struct LogStream { #[derive(Debug)] pub struct StorageDir { + pub stream_name: String, pub data_path: PathBuf, pub temp_dir: PathBuf, } +// Storage Dir is a type which can move files form datapath to temp dir impl StorageDir { - pub fn new(stream_name: &str) -> Self { - let data_path = CONFIG.parseable.local_stream_data_path(stream_name); + pub fn new(stream_name: String) -> Self { + let data_path = CONFIG.parseable.local_stream_data_path(&stream_name); let temp_dir = data_path.join("tmp"); Self { + stream_name, data_path, temp_dir, } } + // create tmp dir if it does not exist pub fn create_temp_dir(&self) -> io::Result<()> { fs::create_dir_all(&self.temp_dir) } - pub fn move_parquet_to_temp(&self, filename: String) -> io::Result<()> { - fs::rename( - self.data_path.join("data.parquet"), - self.temp_dir.join(filename), - ) + pub fn move_local_to_temp(&self, filename: String) -> Result { + let record_tmp_file_path = self.temp_dir.join(filename.clone() + ".tmp"); + fs::rename(self.data_path.join("data.records"), &record_tmp_file_path) + .map_err(|_| MoveDataError::Rename)?; + event::STREAM_WRITERS::unset_entry(&self.stream_name); + let file = File::open(&record_tmp_file_path).map_err(|_| MoveDataError::Open)?; + let reader = StreamReader::try_new(file, None)?; + let schema = reader.schema(); + let records = reader.filter_map(|record| match record { + Ok(record) => Some(record), + Err(e) => { + log::warn!("error when reading from arrow stream {:?}", e); + None + } + }); + + let parquet_path = self.temp_dir.join(filename); + let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; + let props = WriterProperties::builder().build(); + let mut writer = ArrowWriter::try_new(parquet_file, schema, Some(props))?; + + for ref record in records { + writer.write(record)?; + } + + writer.close()?; + + fs::remove_file(record_tmp_file_path).map_err(|_| MoveDataError::Delete)?; + + let compressed_size = fs::metadata(parquet_path) + .map_err(|_| MoveDataError::Metadata)? + .len(); + + Ok(compressed_size) } - pub fn parquet_path_exists(&self) -> bool { - self.data_path.join("data.parquet").exists() + pub fn local_data_exists(&self) -> bool { + self.data_path.join("data.records").exists() } } +#[derive(Debug, thiserror::Error)] +pub enum MoveDataError { + #[error("Failed to rename file")] + Rename, + #[error("Unable to Open file after moving")] + Open, + #[error("Unable to create recordbatch stream")] + Arrow(#[from] ArrowError), + #[error("Could not generate parquet file")] + Parquet(#[from] ParquetError), + #[error("Could not generate parquet file")] + Create, + #[error("Could not delete temp arrow file")] + Delete, + #[error("Could not fetch metadata of moved parquet file")] + Metadata, +} + struct StorageSync { pub dir: StorageDir, time: chrono::DateTime, } impl StorageSync { - fn new(stream_name: &str) -> Self { + fn new(stream_name: String) -> Self { let dir = StorageDir::new(stream_name); let time = Utc::now(); Self { dir, time } } - fn move_parquet_to_temp(&self) -> io::Result<()> { + fn move_local_to_temp(&self) -> Result { let time = self.time - Duration::minutes(OBJECT_STORE_DATA_GRANULARITY as i64); let uri = utils::date_to_prefix(time.date()) + &utils::hour_to_prefix(time.hour()) @@ -203,7 +274,7 @@ impl StorageSync { let local_uri = str::replace(&uri, "/", "."); let hostname = utils::hostname_unchecked(); let parquet_file_local = format!("{}{}.data.parquet", local_uri, hostname); - self.dir.move_parquet_to_temp(parquet_file_local) + self.dir.move_local_to_temp(parquet_file_local) } }