From 2a57d22f1665aa92b657b44e19d1d8663505eff9 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Sun, 14 Aug 2022 19:30:51 +0530 Subject: [PATCH 1/4] Refactor code --- server/src/metadata.rs | 51 ++++++++++++++++++++---------------------- server/src/utils.rs | 5 +++-- 2 files changed, 27 insertions(+), 29 deletions(-) diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 22ba214db..a1cbe57ef 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -18,7 +18,6 @@ use bytes::Bytes; use lazy_static::lazy_static; -use log::warn; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::RwLock; @@ -126,17 +125,28 @@ impl STREAM_INFO { // to load the stream metadata based on whatever is available. // // TODO: ignore failure(s) if any and skip to next stream - let alert_config = parse_string(storage.get_alert(&stream.name).await) - .map_err(|_| Error::AlertNotInStore(stream.name.to_owned()))?; - let schema = parse_string(storage.get_schema(&stream.name).await) - .map_err(|_| Error::SchemaNotInStore(stream.name.to_owned()))?; - let metadata = LogStreamMetadata { - schema, - alert_config, - ..Default::default() + let alert_config = storage + .get_alert(&stream.name) + .await + .and_then(parse_string) + .map_err(|_| Error::AlertNotInStore(stream.name.to_owned())); + + let schema = storage + .get_schema(&stream.name) + .await + .and_then(parse_string) + .map_err(|_| Error::SchemaNotInStore(stream.name.to_owned())); + + let metadata = match (alert_config, schema) { + (Ok(alert_config), Ok(schema)) => LogStreamMetadata { + schema, + alert_config, + ..Default::default() + }, + _ => continue, }; let mut map = self.write().unwrap(); - map.insert(stream.name.to_owned(), metadata); + map.insert(stream.name.clone(), metadata); } Ok(()) @@ -159,21 +169,8 @@ impl STREAM_INFO { } } -fn parse_string(result: Result) -> Result { - let mut string = String::new(); - let bytes = match result { - Ok(bytes) => bytes, - Err(e) => { - warn!("Storage error: {}", e); - return Ok(string); - } - }; - - if !bytes.is_empty() { - string = String::from_utf8(bytes.to_vec())?; - } - - Ok(string) +fn parse_string(bytes: Bytes) -> Result { + String::from_utf8(bytes.to_vec()).map_err(|e| e.into()) } #[cfg(test)] @@ -214,14 +211,14 @@ mod tests { #[case::empty_string("")] fn test_parse_string(#[case] string: String) { let bytes = Bytes::from(string); - assert!(parse_string(Ok(bytes)).is_ok()) + assert!(parse_string(bytes).is_ok()) } #[test] fn test_bad_parse_string() { let bad: Vec = vec![195, 40]; let bytes = Bytes::from(bad); - assert!(parse_string(Ok(bytes)).is_err()); + assert!(parse_string(bytes).is_err()); } #[rstest] diff --git a/server/src/utils.rs b/server/src/utils.rs index b8666b3f6..b631d9e6e 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -118,7 +118,8 @@ pub fn collect_labels(req: &HttpRequest) -> Option { if key.to_string().to_lowercase().starts_with(META_LABEL) { let value = req.headers().get(&key)?.to_str().ok(); let remove_meta_char = format!("{}-", META_LABEL); - let kv = format! {"{}={}", key.to_string().replace(&remove_meta_char.to_string(), ""), value.unwrap()}; + let kv = + format! {"{}={}", key.to_string().replace(&remove_meta_char, ""), value.unwrap()}; labels_vec.push(kv); } } @@ -142,7 +143,7 @@ impl TimePeriod { } pub fn generate_prefixes(&self, prefix: &str) -> Vec { - let prefix = prefix.to_string() + "/"; + let prefix = format!("{}/", prefix); let end_minute = self.end.minute() + if self.end.second() > 0 { 1 } else { 0 }; From b782d52c0171638e8875880cdecd2121b3024d15 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 18 Aug 2022 11:04:25 +0530 Subject: [PATCH 2/4] Abstract over possible failures for object store --- server/src/error.rs | 2 +- server/src/handlers/mod.rs | 2 +- server/src/main.rs | 1 + server/src/metadata.rs | 2 + server/src/option.rs | 17 ++++++++ server/src/s3.rs | 81 +++++++++++++++++++++++++++----------- server/src/storage.rs | 56 ++++++++++++++++++-------- 7 files changed, 120 insertions(+), 41 deletions(-) diff --git a/server/src/error.rs b/server/src/error.rs index 2394a6694..4cd7c90e4 100644 --- a/server/src/error.rs +++ b/server/src/error.rs @@ -33,7 +33,7 @@ pub enum Error { #[error("JSON provided to query api doesn't contain {0}")] JsonQuery(&'static str), #[error("Storage error: {0}")] - Storage(Box), + Storage(ObjectStorageError), #[error("Event error: {0}")] Event(#[from] EventError), #[error("Parquet error: {0}")] diff --git a/server/src/handlers/mod.rs b/server/src/handlers/mod.rs index ff2a1ed9d..5c02205ab 100644 --- a/server/src/handlers/mod.rs +++ b/server/src/handlers/mod.rs @@ -37,7 +37,7 @@ pub async fn liveness() -> HttpResponse { } pub async fn readiness() -> HttpResponse { - if S3::new().is_available().await { + if let Ok(()) = S3::new().check().await { return HttpResponse::new(StatusCode::OK); } diff --git a/server/src/main.rs b/server/src/main.rs index 1b90f68ad..9be755e71 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -59,6 +59,7 @@ async fn main() -> anyhow::Result<()> { CONFIG.print(); CONFIG.validate(); let storage = S3::new(); + CONFIG.validate_storage(&storage).await; if let Err(e) = metadata::STREAM_INFO.load(&storage).await { warn!("could not populate local metadata. {:?}", e); } diff --git a/server/src/metadata.rs b/server/src/metadata.rs index a1cbe57ef..f328af6cd 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -128,12 +128,14 @@ impl STREAM_INFO { let alert_config = storage .get_alert(&stream.name) .await + .map_err(|e| e.into()) .and_then(parse_string) .map_err(|_| Error::AlertNotInStore(stream.name.to_owned())); let schema = storage .get_schema(&stream.name) .await + .map_err(|e| e.into()) .and_then(parse_string) .map_err(|_| Error::SchemaNotInStore(stream.name.to_owned())); diff --git a/server/src/option.rs b/server/src/option.rs index 74dd5b91f..d2cf85a2b 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -23,6 +23,7 @@ use structopt::StructOpt; use crate::banner; use crate::s3::S3Config; +use crate::storage::{ObjectStorage, ObjectStorageError}; lazy_static::lazy_static! { #[derive(Debug)] @@ -72,6 +73,22 @@ impl Config { } } + pub async fn validate_storage(&self, storage: &impl ObjectStorage) { + match storage.check().await { + Ok(_) => (), + Err(ObjectStorageError::NoSuchBucket(name)) => panic!( + "Could not start because the bucket named {bucket} doesn't exist. Please make sure bucket with the name {bucket} exists on {url} and then start parseable again", + bucket = name, + url = self.storage.endpoint_url() + ), + Err(ObjectStorageError::ConnectionError(inner)) => panic!( + "Failed to connect to the Object Storage Service\nCaused by: {}", + inner + ), + Err(error) => { panic!("{error}") } + } + } + fn status_info(&self, scheme: &str) { let url = format!("{}://{}", scheme, CONFIG.parseable.address).underlined(); eprintln!( diff --git a/server/src/s3.rs b/server/src/s3.rs index 24880ec82..47da6724b 100644 --- a/server/src/s3.rs +++ b/server/src/s3.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; +use aws_sdk_s3::error::{HeadBucketError, HeadBucketErrorKind}; use aws_sdk_s3::model::{Delete, ObjectIdentifier}; -use aws_sdk_s3::types::ByteStream; +use aws_sdk_s3::types::{ByteStream, SdkError}; use aws_sdk_s3::Error as AwsSdkError; use aws_sdk_s3::{Client, Credentials, Endpoint, Region}; use aws_types::credentials::SharedCredentialsProvider; @@ -18,7 +19,6 @@ use std::sync::Arc; use structopt::StructOpt; use tokio_stream::StreamExt; -use crate::error::Error; use crate::metadata::Stats; use crate::option::{StorageOpt, CONFIG}; use crate::query::Query; @@ -96,14 +96,6 @@ impl StorageOpt for S3Config { } } -impl ObjectStorageError for AwsSdkError {} - -impl From for Error { - fn from(e: AwsSdkError) -> Self { - Self::Storage(Box::new(e)) - } -} - struct S3Options { endpoint: Endpoint, region: Region, @@ -304,70 +296,83 @@ impl S3 { #[async_trait] impl ObjectStorage for S3 { - async fn is_available(&self) -> bool { + async fn check(&self) -> Result<(), ObjectStorageError> { self.client .head_bucket() .bucket(&S3_CONFIG.s3_bucket_name) .send() .await - .is_ok() + .map(|_| ()) + .map_err(|err| err.into()) } - async fn put_schema(&self, stream_name: String, body: String) -> Result<(), Error> { + async fn put_schema( + &self, + stream_name: String, + body: String, + ) -> Result<(), ObjectStorageError> { self._put_schema(stream_name, body).await?; Ok(()) } - async fn create_stream(&self, stream_name: &str) -> Result<(), Error> { + async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { self._create_stream(stream_name).await?; Ok(()) } - async fn delete_stream(&self, stream_name: &str) -> Result<(), Error> { + async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> { self._delete_stream(stream_name).await?; Ok(()) } - async fn create_alert(&self, stream_name: &str, body: String) -> Result<(), Error> { + async fn create_alert( + &self, + stream_name: &str, + body: String, + ) -> Result<(), ObjectStorageError> { self._create_alert(stream_name, body).await?; Ok(()) } - async fn get_schema(&self, stream_name: &str) -> Result { + async fn get_schema(&self, stream_name: &str) -> Result { let body_bytes = self._get_schema(stream_name).await?; Ok(body_bytes) } - async fn get_alert(&self, stream_name: &str) -> Result { + async fn get_alert(&self, stream_name: &str) -> Result { let body_bytes = self._alert_exists(stream_name).await?; Ok(body_bytes) } - async fn get_stats(&self, stream_name: &str) -> Result { + async fn get_stats(&self, stream_name: &str) -> Result { let stats = serde_json::from_slice(&self._get_stats(stream_name).await?)?; Ok(stats) } - async fn list_streams(&self) -> Result, Error> { + async fn list_streams(&self) -> Result, ObjectStorageError> { let streams = self._list_streams().await?; Ok(streams) } - async fn upload_file(&self, key: &str, path: &str) -> Result<(), Error> { + async fn upload_file(&self, key: &str, path: &str) -> Result<(), ObjectStorageError> { self._upload_file(key, path).await?; Ok(()) } - async fn query(&self, query: &Query, results: &mut Vec) -> Result<(), Error> { + async fn query( + &self, + query: &Query, + results: &mut Vec, + ) -> Result<(), ObjectStorageError> { let s3_file_system = Arc::new( S3FileSystem::new( Some(SharedCredentialsProvider::new(self.options.creds.clone())), @@ -397,9 +402,39 @@ impl ObjectStorage for S3 { // execute the query and collect results let df = ctx.sql(query.query.as_str()).await?; - results.extend(df.collect().await.map_err(Error::DataFusion)?); + results.extend(df.collect().await?); } Ok(()) } } + +impl From for ObjectStorageError { + fn from(error: AwsSdkError) -> Self { + ObjectStorageError::UnhandledError(error.into()) + } +} + +impl From> for ObjectStorageError { + fn from(error: SdkError) -> Self { + match error { + SdkError::ServiceError { + err: + HeadBucketError { + kind: HeadBucketErrorKind::NotFound(_), + .. + }, + .. + } => ObjectStorageError::NoSuchBucket(S3_CONFIG.bucket_name().to_string()), + SdkError::DispatchFailure(err) => ObjectStorageError::ConnectionError(err.into()), + SdkError::TimeoutError(err) => ObjectStorageError::ConnectionError(err), + err => ObjectStorageError::UnhandledError(err.into()), + } + } +} + +impl From for ObjectStorageError { + fn from(error: serde_json::Error) -> Self { + ObjectStorageError::UnhandledError(error.into()) + } +} diff --git a/server/src/storage.rs b/server/src/storage.rs index c4139f209..c31e01744 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -16,7 +16,6 @@ * */ -use crate::error::Error; use crate::metadata::Stats; use crate::option::CONFIG; use crate::query::Query; @@ -28,7 +27,7 @@ use chrono::{Duration, Timelike, Utc}; use datafusion::arrow::record_batch::RecordBatch; use serde::Serialize; -use std::fmt::{Debug, Display}; +use std::fmt::Debug; use std::fs; use std::io; use std::iter::Iterator; @@ -36,7 +35,6 @@ use std::path::Path; extern crate walkdir; use walkdir::WalkDir; -pub trait ObjectStorageError: Display + Debug {} /// local sync interval to move data.parquet to /tmp dir of that stream. /// 60 sec is a reasonable value. @@ -48,18 +46,24 @@ pub const OBJECT_STORE_DATA_GRANULARITY: u32 = (LOCAL_SYNC_INTERVAL as u32) / 60 #[async_trait] pub trait ObjectStorage: Sync + 'static { - async fn is_available(&self) -> bool; - async fn put_schema(&self, stream_name: String, body: String) -> Result<(), Error>; - async fn create_stream(&self, stream_name: &str) -> Result<(), Error>; - async fn delete_stream(&self, stream_name: &str) -> Result<(), Error>; - async fn create_alert(&self, stream_name: &str, body: String) -> Result<(), Error>; - async fn get_schema(&self, stream_name: &str) -> Result; - async fn get_alert(&self, stream_name: &str) -> Result; - async fn get_stats(&self, stream_name: &str) -> Result; - async fn list_streams(&self) -> Result, Error>; - async fn upload_file(&self, key: &str, path: &str) -> Result<(), Error>; - async fn query(&self, query: &Query, results: &mut Vec) -> Result<(), Error>; - async fn local_sync(&self) -> Result<(), Error> { + async fn check(&self) -> Result<(), ObjectStorageError>; + async fn put_schema(&self, stream_name: String, body: String) + -> Result<(), ObjectStorageError>; + async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>; + async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>; + async fn create_alert(&self, stream_name: &str, body: String) + -> Result<(), ObjectStorageError>; + async fn get_schema(&self, stream_name: &str) -> Result; + async fn get_alert(&self, stream_name: &str) -> Result; + async fn get_stats(&self, stream_name: &str) -> Result; + async fn list_streams(&self) -> Result, ObjectStorageError>; + async fn upload_file(&self, key: &str, path: &str) -> Result<(), ObjectStorageError>; + async fn query( + &self, + query: &Query, + results: &mut Vec, + ) -> Result<(), ObjectStorageError>; + async fn local_sync(&self) -> io::Result<()> { // If the local data path doesn't exist yet, return early. // This method will be called again after next ticker interval if !Path::new(&CONFIG.parseable.local_disk_path).exists() { @@ -103,7 +107,7 @@ pub trait ObjectStorage: Sync + 'static { Ok(()) } - async fn s3_sync(&self) -> Result<(), Error> { + async fn s3_sync(&self) -> Result<(), ObjectStorageError> { if !Path::new(&CONFIG.parseable.local_disk_path).exists() { return Ok(()); } @@ -228,3 +232,23 @@ impl StorageSync { } } } + +#[derive(Debug, thiserror::Error)] +pub enum ObjectStorageError { + #[error("Bucket {0} not found")] + NoSuchBucket(String), + #[error("Connection Error: {0}")] + ConnectionError(Box), + #[error("IO Error: {0}")] + IoError(#[from] std::io::Error), + #[error("DataFusion Error: {0}")] + DataFusionError(#[from] datafusion::error::DataFusionError), + #[error("Unhandled Error: {0}")] + UnhandledError(Box), +} + +impl From for crate::error::Error { + fn from(e: ObjectStorageError) -> Self { + crate::error::Error::Storage(e) + } +} From ca91dbf3e89f42320012ecd98bc96e964d14ca52 Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 18 Aug 2022 13:55:04 +0530 Subject: [PATCH 3/4] use default --- server/src/metadata.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/server/src/metadata.rs b/server/src/metadata.rs index f328af6cd..b35f07804 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -139,14 +139,12 @@ impl STREAM_INFO { .and_then(parse_string) .map_err(|_| Error::SchemaNotInStore(stream.name.to_owned())); - let metadata = match (alert_config, schema) { - (Ok(alert_config), Ok(schema)) => LogStreamMetadata { - schema, - alert_config, - ..Default::default() - }, - _ => continue, + let metadata = LogStreamMetadata { + schema: schema.unwrap_or_default(), + alert_config: alert_config.unwrap_or_default(), + ..Default::default() }; + let mut map = self.write().unwrap(); map.insert(stream.name.clone(), metadata); } From 3c5e949e3c951fe1a81dba3a2ab73fd8d60f944d Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Thu, 18 Aug 2022 14:06:36 +0530 Subject: [PATCH 4/4] Change panic message --- server/src/option.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/option.rs b/server/src/option.rs index d2cf85a2b..a14942ce6 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -77,13 +77,14 @@ impl Config { match storage.check().await { Ok(_) => (), Err(ObjectStorageError::NoSuchBucket(name)) => panic!( - "Could not start because the bucket named {bucket} doesn't exist. Please make sure bucket with the name {bucket} exists on {url} and then start parseable again", + "Could not start because the bucket doesn't exist. Please ensure bucket {bucket} exists on {url}", bucket = name, url = self.storage.endpoint_url() ), Err(ObjectStorageError::ConnectionError(inner)) => panic!( - "Failed to connect to the Object Storage Service\nCaused by: {}", - inner + "Failed to connect to the Object Storage Service on {url}\nCaused by: {cause}", + url = self.storage.endpoint_url(), + cause = inner ), Err(error) => { panic!("{error}") } }